Home About us Support Partners SIGN UP NOW

Connect MQTT Clients Using Paho C

Text Copied
Introduction

This documentation guides you through connecting MQTT clients to our CrystalMQ broker or any broker of your choice using Eclipse Paho C library, enabling efficient communication between devices.

Eclipse Paho Embedded C is suitable for desktop operating systems but is primarily designed for embedded environments such as mbed, Arduino, and FreeRTOS.

Pre-requisites for MQTT client connection

Before connecting MQTT clients to the MQTT broker, ensure the following prerequisites are met:

  • Any MQTT broker of your choice installed and running.
  • Eclipse Paho C library installed.
  • Access credentials (username/password) for MQTT broker authentication if required.

Dependency installation

Eclipse Paho MQTT C Library

This library must be included in the Client code as:

#include "MQTTAsync.h"

This library can be built from source.

Linux/macOS

git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
make
sudo make install

Windows

mkdir build.paho
cd build.paho
call "C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\vcvarsall.bat" x64
cmake -G "NMake Makefiles" -DPAHO_WITH_SSL=TRUE -DPAHO_BUILD_DOCUMENTATION=FALSE -DPAHO_BUILD_SAMPLES=TRUE -DCMAKE_BUILD_TYPE=Release -DCMAKE_VERBOSE_MAKEFILE=TRUE ..
nmake

Connecting to MQTT Broker

This section has code snippets of various ways to connect to MQTT Broker. Ensure that the MQTT Broker supports the connection type that you would like to use. Also, obtain the corresponding connection parameters of the MQTT Broker (Address, Port, Username/Password, CA Certificate)

MQTT Over TCP

Use the following code to connect the client over TCP. Define the Macro ADDRESS using MQTT Broker's connection parameters.

MQTT 3.1.1

#define ADDRESS "tcp://public-mqtt-broker.bevywise.com:1883"
#define CLIENTID "crystamq_testclient"
MQTTAsync client;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}

MQTT 5.0

/ Create Client
MQTTAsync client;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
create_opts.MQTTVersion = MQTTVERSION_5;
int rc = MQTTAsync_createWithOptions(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
if (rc != MQTTASYNC_SUCCESS)
{
fprintf(stderr, "Failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
exit(EXIT_FAILURE);
}

// Connect Client
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer5;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}

MQTT Over TLS / SSL

Use the following code to connect securely to MQTT Broker over TLS. Define the Macro ADDRESS using MQTT Broker's connection parameters.

#define ADDRESS
"ssl://freemqttbroker.sfodo.mqttserver.com:8883"

Set TLS parameters before calling the MQTTClient_connect to connect the client to the mQTT Broker securely over TLS.

If the MQTT Broker is hosted in a trusted server and the server verification is not required, the following code can be used to set TLS Options:

MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
ssl_opts.verify = 0;ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;
ssl_opts.enableServerCertAuth = 0;
ssl_opts.verify = 0;
conn_opts.ssl = &ssl_opts;

If the MQTT Broker has Server Certificate issued from a Trusted CA, then the Server Certificate can be verified using:

MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
ssl_opts.verify = 0;ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;
ssl_opts.enableServerCertAuth = 1;
ssl_opts.verify = 1;
conn_opts.ssl = &ssl_opts;

If the MQTT Broker has a self-signed Server Certificate then the Server Certificate can be verified using the Root Certificate obtained from the MQTT Broker:

MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
ssl_opts.verify = 0;ssl_opts.sslVersion = MQTT_SSL_VERSION_TLS_1_2;
ssl_opts.enableServerCertAuth = 1;
ssl_opts.verify = 1;
ssl_opts.CApath = "./root.crt";
conn_opts.ssl = &ssl_opts;

MQTT Over WebSocket

Define the MQTT Broker Address like this to connect the client over WebSocket.

#define ADDRESS
"ws://freemqttbroker.sfodo.mqttserver.com:10443"

MQTT Over Secure WebSocket

Use the following code to connect the client over Secure WebSocket. Set TLS Options as given in MQTT Over TLS section. Define the Macro ADDRESS using MQTT Broker's connection parameters.

#define ADDRESS "wss://public-mqtt-broker.bevywise.com:11443"

Configuring MQTT Authentication

To connect to MQTT Broker that requires MQTT Username and MQTT Password for authentication, add to username and password to the connection options like this:

MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.username = "your-mqtt-username";
conn_opts.password = "your-mqtt-password";

Advanced Features

Setting Up Last Will & Testament

Configure the Last Will and Testament feature to specify a message that the broker will publish if the client unexpectedly disconnects. This helps inform other subscribers of the disconnected client's status.

Use the following code to set Last Will in the Connection Options:

MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
will_opts.topicName = "MQTT/Will"; //Change this to will topic of your choice
will_opts.qos = 0; //Change this to QoS of your choice
will_opts.message = "Unexpectedly Disconnected";
conn_opts.will = &will_opts;

Adjusting Keep Alive

Adjust the keep-alive interval to control how frequently the client sends PINGREQ messages to the broker.

Modify the code below to suit your requirements:

MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;

Configuring Session Persistence

Session data of an MQTT Client include the Subscriptions made by the Client and any data that the Client would receive with QoS>0. The Client can get the MQTT Broker to store its session data across connections.

MQTT 3.1.1 Clients can set Clean Session = 0 to request the MQTT Broker to keep its session information stored across connections.

MQTT 3.1.1

conn_opts.cleansession = 0;

MQTT 5 Clients can set Clean Start = 0 and Session Expiry Interval = 'N' to request the MQTT Broker to keep its session information stored across connections for 'N' seconds.

MQTT 5

MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer5;
conn_opts.cleanstart = 1;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperty property1
; property1.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
property1.value.integer2 = 90;
MQTTProperties_add(&props, &property1);

Setting Maximum Packet Size

MQTT5 Client can request the MQTT Broker to only send data packets less than a specific size by setting it like this:

MQTT 5

MQTTProperty property2;
property2.identifier = MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE;
property2.value.integer4 = 4094;
MQTTProperties_add(&props, &property2);

Publish

Sending Data

Efficiently distribute data to multiple subscribers by publishing it to designated topics with the following code snippet:

MQTT 3.1.1

#define TOPIC "device/status"
#define PAYLOAD "ON"
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = 1;
pubmsg.retained = 0;
deliveredtoken = 0;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}

MQTT 5

MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &pub_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}

Setting Retained Messages

Enable the retain flag when publishing a message to ensure the broker stores the last message for each topic. This guarantees that new subscribers receive the most recent message upon connecting.

To implement this, use the following code snippet:

pubmsg.retained = 1

Specifying QoS Levels

MQTT provides three levels of Quality of Service (QoS) for message delivery:

  • QoS 0 (At most once)
  • QoS 1 (At least once)
  • QoS 2 (Exactly once)

Specify the required QoS level when publishing MQTT messages using this code:

pubmsg.qos = 1; // Set to 0 or 1 or 2

Message Expiry Interval

The 'Message expiry interval' property sets a message's life span in seconds; if undelivered within this time, the broker discards it. MQTT5 supports this feature. MQTT5 Clients can set this while publishing data.

MQTT5

MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
pub_opts.context = client;
MQTTProperties pub_props = MQTTProperties_initializer;
MQTTProperty property3;
property3.identifier = MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL;
property3.value.integer4 = 90;
MQTTProperties_add(&pub_props, &property3);
pub_opts.properties = pub_props;

Topic Alias

The 'Topic Alias' property allows clients to use a short alias instead of a full topic name, reducing message packet size and improving network efficiency.

MQTT5

MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
pub_opts.context = client;
MQTTProperties pub_props = MQTTProperties_initializer;
MQTTProperty property3;
property3.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
property3.value.integer2 = 10;
MQTTProperties_add(&pub_props, &property3);
pub_opts.properties = pub_props;

Properties associated with MQTT PUBLISH enhance message handling, providing context or instructions for brokers and clients. These properties, including message expiry intervals and topic aliases, optimize message delivery and network bandwidth.

Subscribe

Subscribing to Topic Filter

To receive data published by other clients, this client has to subscribe to a matching Topic Filter like this:

#define FILTER "your-topic-filter"
#define QOS 1
if ((rc = MQTTAsync_subscribe(client, FILTER, QOS, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}

This topic filter can match with an exact topic or it can have wildcards like # and +

Receiving Data

To receive data sent for the subscriptions, a callback function needs to be defined like this:

/*This Callback is triggered when this client has received a message*/
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; ipayloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
MQTTClient_setCallbacks(client, NULL, NULL, msgarrvd, NULL);

Unsubscribing from Topics

To stop receiving updates from a topic, use the code provided to unsubscribe.

#define FILTER "your-topic-filter"
if ((rc = MQTTAsync_unsubscribe(client, FILTER, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to unsubscribe, return code %d\n", rc);
}

Disconnecting the Client

Ensure a proper termination of your client's connection with the broker to avoid issues and resource leaks on both sides, thereby maintaining system stability.

Use the following code to disconnect the client from the broker:

MQTTAsync_disconnect(client, NULL);
MQTTAsync_destroy(&client);

Building Your Business Logic

You have the opportunity to develop and customize your own intricate business logic within this environment, tailoring it precisely to your specific needs and objectives.

Implementing Best Practices

Client ID Assignment

Assign a specific client ID to every device to maintain accurate identification. In private instances, allocate unique IDs to individual clients, and in shared environments, attach a random string to each client ID for uniqueness assurance.

Designing data

Organize your data structure with careful planning. Whether managing plain text, JSON formatting, or numeric data, ensure the design is customized to suit the specific requirements of your application.

Robust Error Handling

Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.

Securing Credentials

Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.

Regular Testing & Monitoring

Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.

Optimizing Session Management

Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.

Reconnect on Disconnect

Reconnect on Disconnect Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.

Download Code

Download the complete code for client that uses Paho C MQTT Client Library to connect with our CrystalMQ broker or any broker of your choice

MQTT 3.1.1

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "MQTTAsync.h"

#define ADDRESS "tcp://localhost:1883"
#define TOPIC "sensor/temperature"
#define PAYLOAD "180"
#define QOS 1
#define TIMEOUT 10000L

int finished = 0;

void generate_clientid(char *str, int str_length)
{
srand(time(NULL));
const char *alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz", *PREFIX = "crystalmq_";
strncpy(str, PREFIX, strlen(PREFIX));
for (int i = 0; i < (str_length-strlen(PREFIX)); ++i) {
str[10 + i] = alphanum[rand() % 62]; // 62 is the length of the alphanum string
}
str[str_length] = '\0';
}

void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
finished = 1;
}

void onSubscribe(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
}

void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}

void onConnect(void* context, MQTTAsync_successData* response)
{
printf("Successful connection\n");
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; ipayloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}

int main(int argc, char* argv[])
{
// Create Client
char clientid[21];
generate_clientid(clientid, 20);
MQTTAsync client;
MQTTAsync_create(&client, ADDRESS, clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, NULL, msgarrvd, NULL);

MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 60;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
int rc;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
sleep(1);

// Subscribe
MQTTAsync_responseOptions sub_opts = MQTTAsync_responseOptions_initializer;
sub_opts.onSuccess = onSubscribe;
sub_opts.context = client;
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &sub_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}

// Publish
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}

while (!finished)
#if defined(WIN32) || defined(WIN64)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&client);
return rc;
}

MQTT 5

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "MQTTAsync.h"

#define ADDRESS "tcp://localhost:1883"
#define TOPIC "sensor/humidity"
#define PAYLOAD "20.3"
#define QOS 1
#define TIMEOUT 10000L

int finished = 0;

void generate_clientid(char *str, int str_length)
{
srand(time(NULL));
const char *alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz", *PREFIX = "crystalmq_";
strncpy(str, PREFIX, strlen(PREFIX));
for (int i = 0; i < (str_length-strlen(PREFIX)); ++i) {
str[10 + i] = alphanum[rand() % 62]; // 62 is the length of the alphanum string
}
str[str_length] = '\0';
}

void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
finished = 1;
}

void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}

void onConnect(void* context, MQTTAsync_successData* response)
{
printf("Successful connection\n");
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; ipayloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}

int main(int argc, char* argv[])
{
// Create Client
char clientid[21];
generate_clientid(clientid, 20);
MQTTAsync client;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
create_opts.MQTTVersion = MQTTVERSION_5;
int rc = MQTTAsync_createWithOptions(&client, ADDRESS, clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
MQTTAsync_setCallbacks(client, NULL, NULL, msgarrvd, NULL);

// Set CONNECT Properties
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer5;
conn_opts.keepAliveInterval = 20;
conn_opts.cleanstart = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;

MQTTProperties props = MQTTProperties_initializer;
// // Session Expiry Interval
MQTTProperty property1;
property1.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
property1.value.integer2 = 90;
MQTTProperties_add(&props, &property1);

// // Maximum Packet Size
MQTTProperty property2;
property2.identifier = MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE;
property2.value.integer4 = 4094;
MQTTProperties_add(&props, &property2);

conn_opts.connectProperties = &props;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
sleep(1);

// Subscribe
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
sleep(1);

// Publish
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
// Set Publish Properties
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
pub_opts.context = client;
MQTTProperties pub_props = MQTTProperties_initializer;
MQTTProperty property3;
property3.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
property3.value.integer4 = 90;
MQTTProperties_add(&pub_props, &property3);
pub_opts.properties = pub_props;

if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &pub_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}

sleep(1);

if ((rc = MQTTAsync_unsubscribe(client, TOPIC, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to unsubscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!finished)
#if defined(WIN32) || defined(WIN64)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&client);
return rc;
}

Create Executable Bundle

Here is how this C client can be compiled using gcc compiler to an executable.

$ gcc -o mqttclient mqttclient.c -lpaho-mqtt3a -lpaho-mqtt3as
$ ./mqttclient

Connect your client to our state-of-the-art CrystalMQ broker or any broker of your choice. This powerful combination will ensure optimal performance and reliability for all your messaging needs, paving the way for a robust and efficient system integration