This documentation provides a comprehensive guide to integrating Node.js with the our MQTT broker (CrystalMQ) or any MQTT broker of your choice. It covers essential tasks such as establishing connections, subscribing to topics, unsubscribing, and exchanging messages. By following these steps, you can effectively implement MQTT communication within Node.js applications.
Before getting started, ensure you have the following:
npm install mqtt --save
This section has code snippets of various ways to connect to MQTT Broker. Ensure that the MQTT Broker supports the connection type that you would like to use. Also, obtain the corresponding connection parameters of the MQTT Broker (Address, Port, Username/Password, CA Certificate)
MQTT Over TCP
Use the following code to connect the client over TCP. Define the Macro ADDRESS using MQTT Broker's connection parameters.
For MQTT 3.1.1 :
const brokerUrl =
'mqtt://public-mqtt-broker.bevywise.com;
const conn_params = {clientId : "crystalmq_" +
Math.random().toString(36).substring(2, 12)};
const client = mqtt.connect(brokerUrl, conn_params);
For MQTT 5 :
const mqtt = require('mqtt');
const brokerUrl = 'mqtt://public-mqtt-broker.bevywise.com';
const conn_params = {
clientId: "crystalmq_" + Math.random().toString(36).substring(2,
12),
protocolVersion: 5 // Specify MQTT 5 protocol version
};
const client = mqtt.connect(brokerUrl, conn_params);
MQTT Over TLS / SSL
Use the following code to connect securely to MQTT Broker over TLS. Define the Macro ADDRESS using MQTT Broker's connection parameters.
// Define the MQTT broker address
const ADDRESS = 'mqtt://public-mqtt-broker.bevywise.com';
// TLS options
const tls_options = {
rejectUnauthorized: true, // Set to false if using self-signed
certificates
ca: [fs.readFileSync('/path/to/ca.crt')], // CA certificate(s)
cert: fs.readFileSync('/path/to/client.crt'), // Client certificate
key: fs.readFileSync('/path/to/client.key'), // Client private key
};
// MQTT connection options
const conn_params = {
clientId: "crystalmq_" + Math.random().toString(36).substring(2,
12),
protocolVersion: 4, // MQTT 3.1.1 protocol version
protocolId: 'MQTT',
clean: true,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
username: 'your-username',
password: 'your-password',
will: {
topic: 'will-topic',
payload: 'Connection Closed abnormally..!',
qos: 1,
retain: false,
},
keepalive: 60,
resubscribe: true,
rejectUnauthorized: true, // Optionally, set to false if using
self-signed certificates
ca: [fs.readFileSync('/path/to/ca.crt')], // CA certificate(s)
cert: fs.readFileSync('/path/to/client.crt'), // Client certificate
key: fs.readFileSync('/path/to/client.key'), // Client private key
};
// Connect to MQTT broker
const client = mqtt.connect(ADDRESS, conn_params);
Set TLS parameters before calling the MQTTClient_connect to connect the client to the mQTT Broker securely over TLS.
If the MQTT Broker is hosted in a trusted server and the server verification is not required, the following code can be used to set TLS Options:
// Define the MQTT broker address
const ADDRESS = 'mqtt://public-mqtt-broker.bevywise.com';
// TLS options
const tls_options = {
rejectUnauthorized: false, // Set to false to skip server
verification
};
// MQTT connection options
const conn_params = {
clientId: "crystalmq_" + Math.random().toString(36).substring(2,
12),
protocolVersion: 4, // MQTT 3.1.1 protocol version
protocolId: 'MQTT',
clean: true,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
username: 'your-username',
password: 'your-password',
will: {
topic: 'will-topic',
payload: 'Connection Closed abnormally..!',
qos: 1,
retain: false,
},
keepalive: 60,
resubscribe: true,
rejectUnauthorized: false, // Set to false to skip server
verification
};
// Connect to MQTT broker
const client = mqtt.connect(ADDRESS, { ...conn_params, ...tls_options
});
If the MQTT Broker has Server Certificate issued from a Trusted CA, then the Server Certificate can be verified using:
// Define the MQTT broker address
const ADDRESS = 'mqtt://public-mqtt-broker.bevywise.com';
// TLS options
const tls_options = {
ca: [fs.readFileSync('/path/to/ca.crt')], // Trusted CA certificates
};
// MQTT connection options
const conn_params = {
clientId: "crystalmq_" + Math.random().toString(36).substring(2,
12),
protocolVersion: 4, // MQTT 3.1.1 protocol version
protocolId: 'MQTT',
clean: true,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
username: 'your-username',
password: 'your-password',
will: {
topic: 'will-topic',
payload: 'Connection Closed abnormally..!',
qos: 1,
retain: false,
},
keepalive: 60,
resubscribe: true,
rejectUnauthorized: true, // Set to true to verify server
certificate
ca: [fs.readFileSync('/path/to/ca.crt')], // Trusted CA certificates
};
// Connect to MQTT broker
const client = mqtt.connect(ADDRESS, { ...conn_params, ...tls_options
});
If the MQTT Broker has a self-signed Server Certificate then the Server Certificate can be verified using the Root Certificate obtained from the MQTT Broker:
// Define the MQTT broker address
const ADDRESS = 'mqtt://public-mqtt-broker.bevywise.com';
// Root CA certificate (self-signed certificate from MQTT broker)
const rootCA = fs.readFileSync('/path/to/root-ca.crt');
// TLS options
const tls_options = {
ca: [rootCA], // Root CA certificate provided by the MQTT broker
};
// MQTT connection options
const conn_params = {
clientId: "crystalmq_" + Math.random().toString(36).substring(2,
12),
protocolVersion: 4, // MQTT 3.1.1 protocol version
protocolId: 'MQTT',
clean: true,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
username: 'your-username',
password: 'your-password',
will: {
topic: 'will-topic',
payload: 'Connection Closed abnormally..!',
qos: 1,
retain: false,
},
keepalive: 60,
resubscribe: true,
rejectUnauthorized: true, // Set to true to verify server
certificate
ca: [rootCA], // Root CA certificate provided by the MQTT broker
};
// Connect to MQTT broker
const client = mqtt.connect(ADDRESS, { ...conn_params, ...tls_options
});
MQTT Over WebSocket
Define the MQTT Broker Address like this to connect the client over WebSocket.
const brokerUrl = 'ws://public-mqtt-broker.bevywise.com:10443/mqtt';
const conn_params = {
clientId : "crystalmq_" + Math.random().toString(36).substring(2, 12),
username : "some username",
password : "some password"
};
const client = mqtt.connect(brokerUrl, conn_params);
MQTT Over Secure WebSocket
Use the following code to connect the client over Secure WebSocket. Set TLS Options as given in MQTT Over TLS section. Define the Macro ADDRESS using MQTT Broker's connection parameters.
const brokerUrl = 'wss://public-mqtt-broker.bevywise.com:11443/mqtt';
const conn_params = {
clientId : "crystalmq_" + Math.random().toString(36).substring(2,
12),
};
const client = mqtt.connect(brokerUrl, conn_params);
To connect to MQTT Broker that requires MQTT Username and MQTT Password for authentication, add to username and password to the connection options like this:
const brokerUrl = 'mqtt://public-mqtt-broker.bevywise.com;
const conn_params = {
clientId : "crystalmq_" + Math.random().toString(36).substring(2, 12),
username : "some username",
password : "some password"
};
const client = mqtt.connect(brokerUrl, conn_params);
Setting Up Last Will & Testament
Configure the Last Will and Testament feature to specify a message that the broker will publish if the client unexpectedly disconnects. This helps inform other subscribers of the disconnected client's status.
Use the following code to set Last Will in the Connection Options:
const brokerUrl = 'mqtt://public-mqtt-broker.bevywise.com;
const conn_params = {
clientId : "crystalmq_" + Math.random().toString(36).substring(2, 12),
will: {
topic: 'lastwill',
payload: 'Goodbye, CrystalMQ!'
}
};
const client = mqtt.connect(brokerUrl, conn_params);
Adjusting Keep Alive
MQTT maintains client-broker connections with a keep-alive mechanism. Adjust the keep-alive interval to control how frequently the client sends PINGREQ messages to the broker.
Modify the code below to suit your requirements:
const conn_params = {
clientId : "crystalmq_" + Math.random().toString(36).substring(2, 12),
keepalive : 60,
}
};
Configuring Session Persistence
Session data of an MQTT Client include the Subscriptions made by the Client and any data that the Client would receive with QoS>0. The Client can get the MQTT Broker to store its session data across connections.
MQTT 3.1.1 Clients can set Clean Session = 0 to request the MQTT Broker to keep its session information stored across connections.
For MQTT 3.1.1 :
const conn_params = {
clientId : "crystalmq_" + Math.random().toString(36).substring(2, 12),
clean: true,
}
};
MQTT 5 Clients can set Clean Start = 0 and Session Expiry Interval = 'N' to request the MQTT Broker to keep its session information stored across connections for 'N' seconds.
For MQTT 5 :
// MQTT connection options for MQTT 5
const conn_params = {
clientId: "crystalmq_" + Math.random().toString(36).substring(2, 12),
clean: true,
protocolVersion: 5 // Specify MQTT 5 protocol version
};
Setting Maximum Packet Size
MQTT5 Client can request the MQTT Broker to only send data packets less than a specific size by setting it like this:
For MQTT 5 :
const conn_params = {
clientId : "crystalmq_" + Math.random().toString(36).substring(2, 12),
protocolVersion: 5,
properties: {
maximumPacketSize: 1024, // Maximum packet size
}
};
Sending Data
Efficiently distribute data to multiple subscribers by publishing it to designated topics with the following code snippet:
For MQTT 3.1.1 :
client.publish('testtopic', 'Hello, CrystalMQ!', (err) => {
if (err) {
console.error('Failed to publish message:', err);
} else {
console.log('Message published');
}
});
For MQTT 5 :
client.publish('testtopic', 'Hello, CrystalMQ!', {
qos: 1, // Quality of Service level (0, 1, or 2)
retain: false, // Retain flag
properties: { // MQTT 5 properties
payloadFormatIndicator: 1,
messageExpiryInterval: 60, // Message expiry interval in seconds
topicAlias: 1,
responseTopic: 'responseTopic',
correlationData: Buffer.from('correlationData'),
userProperties: {
'key': 'value',
},
}
}, (err) => {
if (err) {
console.error('Failed to publish message:', err);
} else {
console.log('Message published');
}
});
Setting Retained Messages
Enable the retain flag when publishing a message to ensure the broker stores the last message for each topic. This guarantees that new subscribers receive the most recent message upon connecting.
To implement this, use the following code snippet:
client.publish('testtopic', 'Hello, CrystalMQ!', { retain: true },
(err) =>
{
if (err) {
console.error('Failed to publish message:', err);
} else {
console.log('Message published with retain flag set to true');
}
});
Specifying QoS Levels
MQTT provides three levels of Quality of Service (QoS) for message delivery:
client.publish('testtopic', 'Hello, Crysql MQ!', { qos: 1 }); // 0 or 1 or 2
Publish properties are attributes or settings associated with MQTT messages that provide additional context or instructions for handling the message by the broker or other clients. These properties can include message expiry interval, QoS level, content type, response topic, etc. They allow for more precise control over how messages are published and processed, ensuring efficient and reliable message delivery in MQTT communication.
Message Expiry Interval
The 'Message expiry interval' property sets a message's life span in seconds; if undelivered within this time, the broker discards it. MQTT5 supports this feature. MQTT5 Clients can set this while publishing data.
For MQTT 5 :
const topic = "testtopic";
const payload = "Hello, CrystalMQ!"
const payload_prop = {
qos: 1,
properties: {
messageExpiryInterval: 60, // Message expiry interval in seconds
}
};
client.publish(topic, payload, payload_prop, (err) => {
if (err) {
console.error('Failed to publish message:', err);
} else {
console.log('Message published with retain flag set to true');
}
});
Topic Alias
The 'Topic Alias' property allows clients to use a short alias instead of a full topic name, reducing message packet size and improving network efficiency.
For MQTT 5 :
const topic = "testtopic";
const payload = "Hello, CrystalMQ!"
const payload_prop = {
qos: 1,
properties: {
topicAlias: 1, // Set the Topic Alias to be used for this message
}
};
client.publish(topic, payload, payload_prop, (err) => {
if (err) {
console.error('Failed to publish message:', err);
} else {
console.log('Message published with retain flag set to true');
}
});
Properties associated with MQTT PUBLISH enhance message handling, providing context or instructions for brokers and clients. These properties, including message expiry intervals and topic aliases, optimize message delivery and network bandwidth.
Subscribing to Topic Filter
To receive data published by other clients, this client has to subscribe to a matching Topic Filter like this:
client.subscribe('testtopic', (err) => {
if (err) {
console.error('Subscription error:', err);
} else {
console.log(`Subscribed`);
}
});
This topic filter can match with an exact topic or it can have wildcards like # and +
Receiving Data
To receive data sent for the subscriptions, a callback function needs to be defined like this:
client.on('connect', function () {
console.log('Connected to MQTT broker');
// Subscribe to a topic
client.subscribe('testtopic', { qos: 1 }, function (err) {
if (err) {
console.error('Failed to subscribe to topic:', err);
} else {
console.log('Subscribed to topic');
}
});
});
Unsubscribing from Topics
To stop receiving updates from a topic, use the code provided to unsubscribe.
client.unsubscribe('testtopic', (err) => {
if (err) {
console.log('unsubscribe error:', err)
return
}
console.log(`unsubscribed.`)
})
Ensure a proper termination of your client's connection with the broker to avoid issues and resource leaks on both sides, thereby maintaining system stability.
Use the following code to disconnect the client from the broker:
client.end();
You have the opportunity to develop and customize your own intricate business logic within this environment, tailoring it precisely to your specific needs and objectives.
Individual Client Identification
Assign a unique client ID to each device for accurate identification purposes. For private deployments, ensure each client receives a distinct ID, while in shared environments, incorporate a random string into client IDs to ensure uniqueness.
Structured Data Organization
Plan your data structure thoughtfully. Whether handling plain text, JSON data formats, or numeric data, ensure the design is tailored to fit your application's specific needs.
Robust Error Handling
Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.
Securing Credentials
Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.
Regular Testing & Monitoring
Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.
Optimizing Session Management
Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.
Reconnect on Disconnect
Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.
Download the complete code for client that uses Node JS MQTT Client Library to connect with our CrystalMQ broker or any broker of your choice.
For MQTT 3.1.1 :
const mqtt = require('mqtt');
// const fs = require('fs');
const brokerUrl = 'mqtt://public-mqtt-broker.bevywise.com';
const conn_params = {
clientId : "crystalmq_" + Math.random().toString(36).substring(2, 12),
// keepalive : 60, // keep alive in seconds
// host : "public-mqtt-broker.bevywise.com" // Host
// protocol: 'mqtts', // protocol
// clean: true, // Clean session
// username : "some username", // mqtt username
// password : "some password", // mqtt password
// will: {
// topic: 'lastwill', // Will topic
// payload: 'Goodbye, CrystalMQ!' // will message
// }
// ca: [fs.readFileSync('/path/to/root.crt')],
};
const publishInterval = 2000;
const client = mqtt.connect(brokerUrl, conn_params);
const topic = 'testtopic'; // publish topic
const message = "Hello CrystalMQ !" // published message
client.on('connect', () => {
console.log('Connected to MQTT broker');
setInterval(() => {
client.publish(topic, message, (err) => {
if (err) {
console.error('Error publishing message:', err);
} else {
console.log('Message published:', message);
}
});
}, publishInterval);
});
client.on('message', (topic, message) => {
console.log(`Received message: ${message.toString()} on topic:
${topic}`);
});
client.on('error', (err) => {
console.error('Error: ', err);
});
client.on('close', () => {
console.log('Disconnected from MQTT broker');
});
client.on('reconnect', () => {
console.log('Reconnecting to MQTT broker...');
});
client.on('offline', () => {
console.error('MQTT client is offline');
});
For MQTT 5
const mqtt = require('mqtt');
const brokerUrl = 'mqtt://public-mqtt-broker.bevywise.com';
const conn_params = {
clientId: "crystalmq_" + Math.random().toString(36).substring(2, 12),
protocolVersion: 5, // Specify MQTT 5 protocol version
clean: true, // Clean session
// Uncomment and set these options if needed
// keepalive: 60, // Keep alive in seconds
// username: "some username", // MQTT username
// password: "some password", // MQTT password
// will: {
// topic: 'lastwill', // Will topic
// payload: 'Goodbye, CrystalMQ!', // Will message
// },
// ca: [fs.readFileSync('/path/to/root.crt')],
};
const publishInterval = 2000;
const client = mqtt.connect(brokerUrl, conn_params);
const topic = 'testtopic'; // Publish topic
const message = "Hello CrystalMQ!"; // Published message
client.on('connect', () => {
console.log('Connected to MQTT broker');
setInterval(() => {
client.publish(topic, message, { qos: 1 }, (err) => {
if (err) {
console.error('Error publishing message:', err);
} else {
console.log('Message published:', message);
}
});
}, publishInterval);
});
client.on('message', (topic, message, packet) => {
console.log(`Received message: ${message.toString()} on topic:
${topic}`);
// Handle MQTT 5 properties if needed
console.log('Properties:', packet.properties);
});
client.on('error', (err) => {
console.error('Error: ', err);
});
client.on('close', () => {
console.log('Disconnected from MQTT broker');
});
client.on('reconnect', () => {
console.log('Reconnecting to MQTT broker...');
});
client.on('offline', () => {
console.error('MQTT client is offline');
});
Steps to Create an Executable File Using `pkg`
1. Install `pkg` globally
Open your terminal or command prompt and run the following command to install `pkg`:
npm install -g pkg
2. Prepare Your Node.js Application
Ensure your `package.json` includes a `bin` property that points to your main script. For example, if your main script is `index.js`, your `package.json` should look something like this:
{
"name": "your-app-name",
"version": "1.0.0",
"main": "index.js",
"bin": "index.js",
"scripts": {
"start": "node index.js"
},
"dependencies": {
"mqtt": "^4.2.8"
}
}
3. Create Executables
Use `pkg` to create executables for different operating systems. Run the following commands in your terminal:
pkg . --targets node16-linux-x64,node16-win-x64
This command specifies the targets as Node.js version 16 for both Linux and Windows 64-bit architectures. Adjust the version and architecture as needed.
4. Distribute Your Executables
After running the `pkg` command, you will find the executables in the current directory. You can distribute these executable files to users. They will be named based on your application name and the target operating system, such as `your-app-name-linux` and `your-app-name-win.exe`.
Connect your client to our state-of-the-art MQTT broker or any broker of your choice. This powerful combination will ensure optimal performance and reliability for all your messaging needs, paving the way for a robust and efficient system integration.