Home

Awesome

The KubeMQ SDK for NodeJS enables Node JS/Typescript developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.

Prerequisites

Installation

The recommended way to use the SDK for Node in your project is to consume it from Node package manager.


npm install kubemq-js

Payload Details

KubeMQ PubSub Client

For executing PubSub operation we have to create the instance of PubsubClient, its instance can be created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.

PubSub Client Configuration

NameTypeDescriptionDefault ValueMandatory
addressStringThe address of the KubeMQ server.NoneYes
clientIdStringThe client ID used for authentication.NoneYes
authTokenStringThe authorization token for secure communication.NoneNo
tlsbooleanIndicates if TLS (Transport Layer Security) is enabled.NoneNo
tlsCertFileStringThe path to the TLS certificate file.NoneNo (Yes if tls is true)
tlsKeyFileStringThe path to the TLS key file.NoneNo (Yes if tls is true)
tlsCaCertFileStringThe path to the TLS CA cert file.NoneNo (Yes if tls is true)
maxReceiveSizeintThe maximum size of the messages to receive (in bytes).104857600 (100MB)No
reconnectIntervalSecondsintThe interval in seconds between reconnection attempts.1No

Pubsub Client connection establishment example code


const  opts: Config = {
	address:  'localhost:50000',
	clientId:  Utils.uuid(),
	reconnectIntervalSeconds:  1,
};

const  pubsubClient = new  PubsubClient(opts);

The example below demonstrates to construct PubSubClient with ssl and other configurations:


const  config: Config = {

	address:  'localhost:50000', // KubeMQ gRPC endpoint address
	clientId:  'your-client-id', // Connection clientId
	authToken:  'your-jwt-auth-token', // Optional JWT authorization token
	tls:  true, // Indicates if TLS is enabled
	tlsCertFile:  'path/to/tls-cert.pem', // Path to the TLS certificate file
	tlsKeyFile:  'path/to/tls-key.pem', // Path to the TLS key file
	tlsCaCertFile:  'path/to/tls-key.pem', // Path to the TLS key file
	maxReceiveSize:  1024 * 1024 * 100, // Maximum size of the messages to receive (100MB)
	reconnectIntervalSeconds:  1 // Interval in milliseconds between reconnect attempts (1 second)
};

Ping To KubeMQ server

You can ping the server to check connection is established or not.

Request: NONE

Response: ServerInfo Interface Attributes

NameTypeDescription
hostStringThe host of the server.
versionStringThe version of the server.
serverStartTimelongThe start time of the server (in seconds).
serverUpTimeSecondslongThe uptime of the server (in seconds).

ServerInfo  pingResult = pubsubClient.ping();
console.log('Ping Response: ' + pingResult);

Create Channel

PubSub CreateEventsChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringChannel name which you want to createNoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion

async  function  createEventsChannel(channel: string) {
return  pubsubClient.createEventsChannel(channel);
}

PubSub Create Events Store Channel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringChannel name to which you want to createNoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion


async  function  createEventsStoreChannel(channel: string) {
return  pubsubClient.createEventsStoreChannel(channel);
}

Delete Channel

PubSub DeleteEventsChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringChannel name which you want to deleteNoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion

async  function  deleteEventsChannel(channel: string) {
return  pubsubClient.deleteEventsChannel(channel);
}

PubSub Delete Events Store Channel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringChannel name to which you want to deleteNoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion

async  function  deleteEventsStoreChannel(channel: string) {
return  pubsubClient.deleteEventsStoreChannel(channel);
}

List Channels

PubSub ListEventsChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
searchStringSearch query to filter channels (optional)NoneNo

Response: PubSubChannel[] PubSubChannel interface Attributes

NameTypeDescription
nameStringThe name of the Pub/Sub channel.
typeStringThe type of the Pub/Sub channel.
lastActivitylongThe timestamp of the last activity on the channel, represented in milliseconds since epoch.
isActivebooleanIndicates whether the channel is active or not.
incomingPubSubStatsThe statistics related to incoming messages for this channel.
outgoingPubSubStatsThe statistics related to outgoing messages for this channel.

async  function  listEventsChannel(search: string) {
	const  channels = await  pubsubClient.listEventsChannels(search);
	console.log(channels);
}

PubSub ListEventsStoreChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
searchStringSearch query to filter channels (optional)NoneNo

Response: PubSubChannel[] PubSubChannel interface Attributes

NameTypeDescription
nameStringThe name of the Pub/Sub channel.
typeStringThe type of the Pub/Sub channel.
lastActivitylongThe timestamp of the last activity on the channel, represented in milliseconds since epoch.
isActivebooleanIndicates whether the channel is active or not.
incomingPubSubStatsThe statistics related to incoming messages for this channel.
outgoingPubSubStatsThe statistics related to outgoing messages for this channel.

async  function  listEventsStoreChannel(search: string) {
	const  channels = await  pubsubClient.listEventsStoreChannels(search);
	console.log(channels);
}

PubSub Send & Receive

PubSub SendEventMessage Example:

Request: EventMessage Interface Attributes

NameTypeDescriptionDefault ValueMandatory
idStringUnique identifier for the event message.NoneNo
channelStringThe channel to which the event message is sent.NoneYes
metadataStringMetadata associated with the event message.NoneNo
bodybyte[]Body of the event message in bytes.Empty byte arrayNo
tagsMap<String, String>Tags associated with the event message as key-value pairs.Empty MapNo

Note:- metadata or body or tags any one is required

Response: NONE


await  pubsubClient.sendEventsMessage({
	id:  `234`,
	channel: 'events.single',
	body:  Utils.stringToBytes('event message'),
});

PubSub SendEventStoreMessage Example:

Request: EventStoreMessage Class Attributes

NameTypeDescriptionDefault ValueMandatory
idStringUnique identifier for the event message.NoneNo
channelStringThe channel to which the event message is sent.NoneYes
metadataStringMetadata associated with the event message.NoneNo
bodybyte[]Body of the event message in bytes.Empty byte arrayNo
tagsMap<String, String>Tags associated with the event message as key-value pairs.Empty MapNo

Note:- metadata or body or tags any one is required

Response: NONE


await  pubsubClient.sendEventStoreMessage({
	id:  '987',
	channel: 'events_store.single',
	body:  Utils.stringToBytes('event store message'),
});

PubSub SubscribeEvents Example:

Request: EventsSubscription Class Attributes

NameTypeDescriptionDefault ValueMandatory
channelStringThe channel to subscribe to.NoneYes
groupStringThe group to subscribe with.NoneNo
onReceiveEventCallbackConsumer<EventMessageReceived>Callback function to be called when an event message is received.NoneYes
onErrorCallbackConsumer<String>Callback function to be called when an error occurs.NoneNo

Response: NONE

Callback: EventMessageReceived class details

NameTypeDescription
idStringThe unique identifier of the message.
fromClientIdStringThe ID of the client that sent the message.
timestamplongThe timestamp when the message was received, in seconds.
channelStringThe channel to which the message belongs.
metadataStringThe metadata associated with the message.
bodybyte[]The body of the message.
sequencelongThe sequence number of the message.
tagsMap<String, String>The tags associated with the message.
async function subscribeToEvent() {  
  //Subscribes to events from the specified channel and processes received events.  
  const eventsSubscriptionRequest = new EventsSubscriptionRequest('events.A', '');  
  
  // Define the callback for receiving events  
  eventsSubscriptionRequest.onReceiveEventCallback = (event: EventMessageReceived) => {  
    console.log('SubscriberA received event:', {  
      id: event.id,  
      fromClientId: event.fromClientId,  
      timestamp: event.timestamp,  
      channel: event.channel,  
      metadata: event.metadata,  
      body: event.body,  
      tags: event.tags,  
    });  
  };  
  
  // Define the callback for handling errors  
  eventsSubscriptionRequest.onErrorCallback = (error: string) => {  
    console.error('SubscriberA error:', error);  
  };  
  
  pubsubClient  
  .subscribeToEvents(eventsSubscriptionRequest)  
    .then(() => {  
      console.log('Subscription successful');  
    })  
    .catch((reason: any) => {  
      console.error('Subscription failed:', reason);  
    });  
  
}

PubSub SubscribeEventsStore Example:

Request: EventsStoreSubscription Interface Attributes

NameTypeDescriptionDefault ValueMandatory
channelStringThe channel to subscribe to.NoneYes
groupStringThe group to subscribe with.NoneNo
onReceiveEventCallbackConsumer<EventStoreMessageReceived>Callback function to be called when an event message is received.NoneYes
onErrorCallbackConsumer<String>Callback function to be called when an error occurs.NoneNo

Response: None

Callback: EventStoreMessageReceived class details

NameTypeDescription
idStringThe unique identifier of the message.
fromClientIdStringThe ID of the client that sent the message.
timestamplongThe timestamp when the message was received, in seconds.
channelStringThe channel to which the message belongs.
metadataStringThe metadata associated with the message.
bodybyte[]The body of the message.
sequencelongThe sequence number of the message.
tagsMap<String, String>The tags associated with the message.
async function subscribeToEventStore() {  
  //Subscribes to events store messages from the specified channel with a specific configuration.  
  const eventsSubscriptionRequest = new EventsStoreSubscriptionRequest('events_store.A', '');  
  eventsSubscriptionRequest.eventsStoreType = EventStoreType.StartAtSequence;  
  eventsSubscriptionRequest.eventsStoreSequenceValue=1;  
  
  // Define the callback for receiving events  
  eventsSubscriptionRequest.onReceiveEventCallback = (event: EventStoreMessageReceived) => {  
    console.log('SubscriberA received event:', {  
      id: event.id,  
      fromClientId: event.fromClientId,  
      timestamp: event.timestamp,  
      channel: event.channel,  
      metadata: event.metadata,  
      body: event.body,  
      tags: event.tags,  
      sequence: event.sequence,  
    });  
  };  
  
  // Define the callback for handling errors  
  eventsSubscriptionRequest.onErrorCallback = (error: string) => {  
    console.error('SubscriberA error:', error);  
  };  
  
  pubsubClient  
  .subscribeToEvents(eventsSubscriptionRequest)  
    .then(() => {  
      console.log('Events Subscription successful');  
    })  
    .catch((reason: any) => {  
      console.error('Event Subscription failed:', reason);  
    });  
}

KubeMQ Queues Operations

The examples below demonstrate the usage of KubeMQ Queues client. The examples include creating, deleting, listing channels, and sending/receiving queues messages.

Construct the Queues Client

For executing Queues operation we have to create the instance of QueuesClient, its instance can be created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.

QueuesClient Configuration

NameTypeDescriptionDefault ValueMandatory
addressStringThe address of the KubeMQ server.NoneYes
clientIdStringThe client ID used for authentication.NoneYes
authTokenStringThe authorization token for secure communication.NoneNo
tlsbooleanIndicates if TLS (Transport Layer Security) is enabled.NoneNo
tlsCertFileStringThe path to the TLS certificate file.NoneNo (Yes if tls is true)
tlsKeyFileStringThe path to the TLS key file.NoneNo (Yes if tls is true)
tlsCaCertFileStringThe path to the TLS CA cert file.NoneNo (Yes if tls is true)
maxReceiveSizeintThe maximum size of the messages to receive (in bytes).104857600 (100MB)No
reconnectIntervalSecondsintThe interval in seconds between reconnection attempts.1No

Queues Client establishing a connection example code


const  opts: Config = {
	address:  'localhost:50000',
	clientId:  Utils.uuid(),
};

const  queuesClient = new  QueuesClient(opts);

The example below demonstrates to construct PubSubClient with ssl and other configurations:

const  opts: Config = {
	address:  'localhost:50000', // KubeMQ gRPC endpoint address
	clientId:  'your-client-id', // Connection clientId
	authToken:  'your-jwt-auth-token', // Optional JWT authorization token
	tls:  true, // Indicates if TLS is enabled
	tlsCertFile:  'path/to/tls-cert.pem', // Path to the TLS certificate file
	tlsKeyFile:  'path/to/tls-key.pem', // Path to the TLS key file
	tlsCaCertFile:  'path/to/tls-ca-cert.pem', // Path to the TLS CA cert file
	maxReceiveSize:  1024 * 1024 * 100, // The Maximum size of the messages to receive (100MB)
	reconnectIntervalSeconds:  1 // Interval in milliseconds between reconnect attempts (1 second)
};

const  queuesClient = new  QueuesClient(opts);

Ping To KubeMQ server

You can ping the server to check connection is established or not.

Request: NONE

Response: ServerInfo Class Attributes

NameTypeDescription
hostStringThe host of the server.
versionStringThe version of the server.
serverStartTimelongThe start time of the server (in seconds).
serverUpTimeSecondslongThe uptime of the server (in seconds).

const  pingResult = queuesClient.ping();
console.log('Ping Response: ' + pingResult);

Create Channel

Queues CreateQueueChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringThe name of the channel you want to createNoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion

async  function  createQueueChannel(channel: string) {
	return  queuesClient.createQueuesChannel(channel);
}

Delete Channel

Queues DeleteQueueChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringThe name of the channel you want to deleteNoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion

async  function  createQueueChannel(channel: string) {
	return  queuesClient.deleteQueuesChannel(channel);
}

List Channels

Queues listQueueChannels Example:

Request:

NameTypeDescriptionDefault ValueMandatory
searchStringStringThe channel name you want to search forNoneNo

Response: QueuesChannel[] QueuesChannel interface Attributes

NameTypeDescription
nameStringThe name of the queue channel.
typeStringThe type of the queue channel.
lastActivitylongThe timestamp of the last activity in the queue channel.
isActivebooleanIndicates whether the queue channel is currently active.
incomingQueuesStatsThe statistics for incoming messages in the queue channel.
outgoingQueuesStatsThe statistics for outgoing messages in the queue channel.

async  function  listQueueChannels(search: string) {
	const  channels = await  queuesClient.listQueuesChannel(search);
	console.log(channels);
}

Send & Receive Queue Messages

Queues SendSingleMessage Example:

Request: QueueMessage class attributes

NameTypeDescriptionDefault ValueMandatory
idStringThe unique identifier for the message.NoneNo
channelStringThe channel of the message.NoneYes
metadataStringThe metadata associated with the message.NoneNo
bodybyte[]The body of the message.new byte[0]No
tagsMap<String, String>The tags associated with the message.new HashMap<>()No
delayInSecondsintThe delay in seconds before the message becomes available in the queue.NoneNo
expirationInSecondsintThe expiration time in seconds for the message.NoneNo
attemptsBeforeDeadLetterQueueintThe number of receive attempts allowed before the message is moved to the dead letter queue.NoneNo
deadLetterQueueStringThe dead letter queue where the message will be moved after reaching max receive attempts.NoneNo

Response: QueueSendResult class attributes

NameTypeDescription
idStringThe unique identifier of the message.
sentAtLocalDateTimeThe timestamp when the message was sent.
expiredAtLocalDateTimeThe timestamp when the message will expire.
delayedToLocalDateTimeThe timestamp when the message will be delivered.
isErrorbooleanIndicates if there was an error while sending the message.
errorStringThe error message if isError is true.

await  queuesClient.sendQueuesMessage({
	channel:  'queues.single',
	body:  Utils.stringToBytes('queue message'),
})
.then((result) =>  console.log(result))
.catch((reason) =>  console.error(reason));

Queues Pulls messages from a queue. Example:

Request: QueuesPullWaitingMessagesRequest class attributes

NameTypeDescriptionDefault ValueMandatory
channelStringThe channel to poll messages from.NoneYes
maxNumberOfMessagesintThe maximum number of messages to poll.1No
waitTimeoutSecondsintThe wait timeout in seconds for polling messages.60No

Response: QueuesPullWaitingMessagesResponse class attributes

NameTypeDescription
idStringThe reference ID of the request.
messagesReceivednumberNumber of valid messages received.
messagesQueueMessage[]The list of received queue messages.
errorStringThe error message, if any error occurred.
isErrorbooleanIndicates if there was an error.
isPeekbooleanIndicates if it is a peek or pull operation.
messagesExpirednumberNumber of expired messages from the queue.

await  queuesClient
.pull({
	channel:  'queues.peek',
	maxNumberOfMessages:  10,
	waitTimeoutSeconds:  10,
})

.then((response) => {
	response.messages.forEach((msg) => {
	console.log(msg);
});
})

.catch((reason) => {
	console.error(reason);
});

Queues Get waiting messages from a queue Example:

Request: QueuesPullWaitngMessagesRequest class attributes

NameTypeDescriptionDefault ValueMandatory
channelStringThe channel to poll messages from.NoneYes
maxNumberOfMessagesintThe maximum number of messages to poll.1No
waitTimeoutSecondsintThe wait timeout in seconds for polling messages.60No

Response: QueuesPullWaitingMessagesResponse class attributes

NameTypeDescription
idStringThe reference ID of the request.
messagesReceivednumberNumber of valid messages received.
messagesQueueMessage[]The list of received queue messages.
errorStringThe error message, if any error occurred.
isErrorbooleanIndicates if there was an error.
isPeekbooleanIndicates if the operation is a peek or pull.
messagesExpirednumberNumber of expired messages from the queue.

await  queuesClient
.waiting({
	channel:  'queues.peek',
	maxNumberOfMessages:  5,
	waitTimeoutSeconds:  20,
})

.then((response) => {
	response.messages.forEach((msg) => {
	console.log(msg);
});
})
.catch((reason) => {
	console.error(reason);
});

Poll Queue Messages

Receives messages from a Queue channel.

Request: QueuesPollRequest Class Attributes

NameTypeDescriptionDefault ValueMandatory
channelStringThe channel to poll messages from.NoneYes
pollMaxMessagesintThe maximum number of messages to poll.1No
pollWaitTimeoutInSecondsintThe wait timeout in seconds for polling messages.60No
autoAckMessagesbooleanIndicates if messages should be auto-acknowledged.falseNo
visibilitySecondsintAdd a visibility timeout feature for messages.0No

Response: QueuesMessagesPulledResponse Class Attributes

NameTypeDescription
idStringThe reference ID of the request.
messagesQueueMessageReceived[]The list of received queue messages.
messagesReceivednumberNumber of valid messages received.
messagesExpirednumberNumber of messages expired.
isPeekbooleanIndicates if the operation is a peek or pull.
errorStringThe error message, if any error occurred.
isErrorbooleanIndicates if there was an error.
visibilitySecondsintThe visibility timeout for the message in seconds.
isAutoAckedbooleanIndicates whether the message was auto-acknowledged.

Response: QueueMessageReceived class attributes

Here's the requested Markdown table for the QueueMessageReceived class:

NameTypeDescription
idStringThe unique identifier for the message.
channelStringThe channel from which the message was received.
metadataStringMetadata associated with the message.
bodybyte[]The body of the message in byte array format.
fromClientIdStringThe ID of the client that sent the message.
tagsMap<String, String>Key-value pairs representing tags for the message.
timestampInstantThe timestamp when the message was created.
sequencelongThe sequence number of the message.
receiveCountintThe number of times the message has been received.
isReRoutedbooleanIndicates whether the message was rerouted.
reRouteFromQueueStringThe name of the queue from which the message was rerouted.
expiredAtInstantThe expiration time of the message, if applicable.
delayedToInstantThe time the message is delayed until, if applicable.
transactionIdStringThe transaction ID associated with the message.
isTransactionCompletedbooleanIndicates whether the transaction for the message is completed.
responseHandlerStreamObserver<QueuesDownstreamRequest>The response handler for processing downstream requests.
receiverClientIdStringThe ID of the client receiving the message.
visibilitySecondsintThe visibility timeout for the message in seconds.
isAutoAckedbooleanIndicates whether the message was auto-acknowledged.

Example

async function main() {  
  const opts: Config = {  
    address: 'localhost:50000',  
    clientId: 'kubeMQClientId-ts',  
  };  
  const queuesClient = new QueuesClient(opts);  
  
  // Receive with message visibility  
  async function receiveWithVisibility(visibilitySeconds: number) {  
    console.log("\n============================== Receive with Visibility =============================\n");  
    try {  
      const pollRequest = new QueuesPollRequest({  
        channel: 'visibility_channel',  
        pollMaxMessages: 1,  
        pollWaitTimeoutInSeconds: 10,  
        visibilitySeconds: visibilitySeconds,  
        autoAckMessages: false,  
      });  
  
      const pollResponse = await queuesClient.receiveQueuesMessages(pollRequest);  
      console.log("Received Message Response:", pollResponse);  
        
      if (pollResponse.isError) {  
        console.log("Error: " + pollResponse.error);  
      } else {  
        pollResponse.messages.forEach(async (msg) => {  
          console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`);  
          try {  
            await new Promise(resolve => setTimeout(resolve, 1000));  
            await msg.ack();  
            console.log("Acknowledged message");  
          } catch (err) {  
            console.error("Error acknowledging message:", err);  
          }  
        });  
      }  
    } catch (error) {  
      console.error('Failed to receive queue messages:', error);  
    }  
  }  
  
  // Test visibility expiration  
  async function receiveWithVisibilityExpired() {  
    console.log("\n============================== Receive with Visibility Expired =============================\n");  
    await receiveWithVisibility(2);  
  }  
  
  // Test visibility extension  
  async function receiveWithVisibilityExtension() {  
    console.log("\n============================== Receive with Visibility Extension =============================\n");  
    try {  
      const pollRequest = new QueuesPollRequest({  
        channel: 'visibility_channel',  
        pollMaxMessages: 1,  
        pollWaitTimeoutInSeconds: 10,  
        visibilitySeconds: 3,  
        autoAckMessages: false,  
      });  
  
      const pollResponse = await queuesClient.receiveQueuesMessages(pollRequest);  
      console.log("Received Message Response:", pollResponse);  
  
      if (pollResponse.isError) {  
        console.log("Error: " + pollResponse.error);  
      } else {  
        pollResponse.messages.forEach(async (msg) => {  
          console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`);  
          try {  
            await new Promise(resolve => setTimeout(resolve, 1000));  
            await msg.extendVisibilityTimer(3);  
            await new Promise(resolve => setTimeout(resolve, 2000));  
            await msg.ack();  
            console.log("Acknowledged message after extending visibility");  
          } catch (err) {  
            console.error("Error during visibility extension:", err);  
          }  
        });  
      }  
    } catch (error) {  
      console.error('Failed to receive queue messages:', error);  
    }  
  }  
  
  await receiveWithVisibilityExpired();  
  await receiveWithVisibilityExtension();  
}  
  
main();

This method allows you to receive messages from a specified Queue channel. You can configure the polling behavior, including the maximum number of messages to receive and the wait timeout. The response provides detailed information about the received messages and the transaction.

Message Handling Options:

  1. Acknowledge (ack): Mark the message as processed and remove it from the queue.
  2. Reject: Reject the message. It won't be requeued.
  3. Requeue: Send the message back to the queue for later processing.

Choose the appropriate handling option based on your application's logic and requirements.

KubeMQ Command & Query Operations

Construct the CQClient

For executing command & query operation we have to create the instance of CQClient, its instance can be created with minimum two parameter address (KubeMQ server address) & clientId . With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.

CQClient Configuration

NameTypeDescriptionDefault ValueMandatory
addressStringThe address of the KubeMQ server.NoneYes
clientIdStringThe client ID used for authentication.NoneYes
authTokenStringThe authorization token for secure communication.NoneNo
tlsbooleanIndicates if TLS (Transport Layer Security) is enabled.NoneNo
tlsCertFileStringThe path to the TLS certificate file.NoneNo (Yes if tls is true)
tlsKeyFileStringThe path to the TLS key file.NoneNo (Yes if tls is true)
tlsCaCertFileStringThe path to the TLS CA cert file.NoneNo (Yes if tls is true)
maxReceiveSizeintThe maximum size of the messages to receive (in bytes).104857600 (100MB)No
reconnectIntervalSecondsintThe interval in seconds between reconnection attempts.1No

CQClient establishing a connection example code


const  opts: Config = {

    address:  'localhost:50000',
    clientId:  Utils.uuid(),
    reconnectIntervalSeconds:  1,
};

const  cqClient = new  CQClient(opts);

The example below demonstrates to construct CQClient with ssl and other configurations:


const  config: Config = {

    address:  'localhost:50000', // KubeMQ gRPC endpoint address
    clientId:  'your-client-id', // Connection clientId
    authToken:  'your-jwt-auth-token', // Optional JWT authorization token
    tls:  true, // Indicates if TLS is enabled
    tlsCertFile:  'path/to/tls-cert.pem', // Path to the TLS certificate file
    tlsKeyFile:  'path/to/tls-key.pem', // Path to the TLS key file
    tlsCaCertFile:  'path/to/tls-ca-cert.pem', // Path to the TLS CA cert file
    maxReceiveSize:  1024 * 1024 * 100, // Maximum size of the messages to receive (100MB)
    reconnectIntervalSeconds:  1, // Interval in milliseconds between reconnect attempts (1 second)
};
const  cqClient = new  CQClient(opts);

Ping To KubeMQ server

You can ping the server to check connection is established or not.

Request: NONE

Response: ServerInfo interface Attributes

NameTypeDescription
hostStringThe host of the server.
versionStringThe version of the server.
serverStartTimelongThe start time of the server (in seconds).
serverUpTimeSecondslongThe uptime of the server (in seconds).

const  pingResult = cqClient.ping();
console.log('Ping Response: ' + pingResult);

Create Channel

Command CreateCommandsChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringChannel name which you want to createNoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion

async  function  createCommandsChannel(channel: string) {
    return  cqClient.createCommandsChannel(channel);
}

Queries CreateQueriesChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringThe name of the channel to create.NoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion

async  function  createQueriesChannel(channel: string) {
    return  cqClient.createQueriesChannel(channel);
}

Delete Channel

Command DeleteCommandsChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringChannel name which you want to deleteNoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion

async  function  deleteCommandsChannel(channel: string) {
    return  cqClient.deleteCommandsChannel(channel);
}

Queries DeleteQueriesChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
channelNameStringThe name of the channel to deleteNoneYes

Response:

NameTypeDescription
voidPromise<void>Doesn't return a value upon completion

async  function  deleteQueriesChannel(channel: string) {
    return  cqClient.deleteQueriesChannel(channel);
}

List Channels

Command ListCommandsChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
searchStringStringThe name of the channel to search for.NoneNo

Response: CQChannel[] CQChannel interface attributes

NameTypeDescription
nameStringThe name of the channel.
typeStringThe type of the channel.
lastActivitylongThe timestamp of the last activity on the channel.
isActivebooleanIndicates whether the channel is currently active.
incomingCQStatsStatistics about incoming messages to the channel.
outgoingCQStatsStatistics about outgoing messages from the channel.

async  function  listCommandsChannels(search: string) {
    const  channels = await  cqClient.listCommandsChannels(search);
    console.log(channels);
}

Queries ListQueriesChannel Example:

Request:

NameTypeDescriptionDefault ValueMandatory
searchStringStringChannel name which you want to searchNoneNo

Response: List<CQChannel> CQChannel class attributes

NameTypeDescription
nameStringThe name of the channel.
typeStringThe type of the channel.
lastActivitylongThe timestamp of the last activity on the channel.
isActivebooleanIndicates whether the channel is currently active.
incomingCQStatsStatistics about incoming messages to the channel.
outgoingCQStatsStatistics about outgoing messages from the channel.

async  function  listQueriesChannels(search: string) {
    const  channels = await  cqClient.listQueriesChannels(search);
    console.log(channels);
}

Send & Receive Command & Query Messages

Command SubscribeToCommandsChannel Example:

Request: CommandsSubscription Class Attributes

NameTypeDescriptionDefault ValueMandatory
channelStringThe channel for the subscription.NoneYes
groupStringThe group associated with the subscription.NoneNo
onReceiveCommandCallbackCommandsReceiveMessageCallback function for receiving commands.NoneYes

Response: None

Callback: CommandsReceiveMessage interface attributes

NameTypeDescription
commandReceivedCommandsReceiveMessageThe command message that was received.
clientIdStringThe ID of the client that sent the command.
requestIdStringThe ID of the request.
isExecutedbooleanIndicates whether the command was executed.
timestampLocalDateTimeThe timestamp of the response.
errorStringThe error message if an error occurred.
async function subscribeToCommands(channelName: string) {
    //Subscribes to commands from the specified channel with a specific configuration.  
    const commandSubscriptionRequest = new CommandsSubscriptionRequest(channelName, 'group1');

    // Define the callback for receiving commandMessage  
    commandSubscriptionRequest.onReceiveEventCallback = (commandMessage: CommandMessageReceived) => {
        console.log('SubscriberA received commandMessage:', {
            id: commandMessage.id,
            fromClientId: commandMessage.fromClientId,
            timestamp: commandMessage.timestamp,
            channel: commandMessage.channel,
            metadata: commandMessage.metadata,
            body: commandMessage.body,
            tags: commandMessage.tags,
        });
    };

    // Define the callback for handling errors  
    commandSubscriptionRequest.onErrorCallback = (error: string) => {
        console.error('SubscriberA error:', error);
    };

    cqClient.subscribeToCommands(commandSubscriptionRequest)
        .then(() => {
            console.log('Command Subscription successful');
        })
        .catch((reason: any) => {
            console.error('Command Subscription failed:', reason);
        });
}

Queries SubscribeToQueriesChannel Example:

Request: QueriesSubscriptionRequest Class Attributes

NameTypeDescriptionDefault ValueMandatory
channelStringThe channel for the subscription.NoneYes
groupStringThe group associated with the subscription.NoneNo
onReceiveQueriesCallbackQueriesReceiveMessageCallback function for receiving queries.NoneYes

Response: None

Callback: QueriesReceiveMessage interface attributes

NameTypeDescription
idStringThe ID of the request.
channelStringChannel name from which the message was received.
metadataStringMetadata of the message.
bodyUint8ArrayThe body of the response.
tagsMap<String, String>Tags associated with the query message.
replyChannelStringThe reply channel for this message.
async function subscribeToQueries(channelName: string) {

    //Subscribes to queries from the specified channel with a specific configuration.  
    const commandSubscriptionRequest = new CommandsSubscriptionRequest(channelName, 'group1');

    // Define the callback for receiving queriesMessage  
    commandSubscriptionRequest.onReceiveEventCallback = (commandMessage: CommandMessageReceived) => {
        console.log('SubscriberA received event:', {
            id: commandMessage.id,
            fromClientId: commandMessage.fromClientId,
            timestamp: commandMessage.timestamp,
            channel: commandMessage.channel,
            metadata: commandMessage.metadata,
            body: commandMessage.body,
            tags: commandMessage.tags,
        });
    };

    // Define the callback for handling errors  
    commandSubscriptionRequest.onErrorCallback = (error: string) => {
        console.error('SubscriberA error:', error);
    };

    cqClient.subscribeToQueries(commandSubscriptionRequest)
        .then(() => {
            console.log('Queries Subscription successful');
        })
        .catch((reason: any) => {
            console.error('Queries Subscription failed:', reason);
        });
}