Home

Awesome

KubeMQ Go SDK

The KubeMQ SDK for Go enables Go developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.

Table of Contents

<!-- TOC --> <!-- TOC -->

Prerequisites

Installation

go get github.com/kubemq-io/kubemq-go

Running Examples

The examples are standalone projects that showcase the usage of the SDK. To run the examples, ensure you have a running instance of KubeMQ.

SDK Overview

The SDK implements all communication patterns available through the KubeMQ server:

PubSub Events Operations

Create Channel

Create a new Events channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
CtxcontextThe context for the request.NoneYes
ChannelNameStringName of the channel you want to createNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func createEventsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-channel-creator"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsClient.Create(ctx, "events-channel"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Events channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
ChannelNameStringName of the channel you want to deleteNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteEventsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-channel-delete"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsClient.Delete(ctx, "events-channel"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Events channels.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
SearchQueryStringSearch query to filter channels (optional)NoneNo

Response

Returns a list where each PubSubChannel has the following attributes:

NameTypeDescription
NameStringThe name of the Pub/Sub channel.
TypeStringThe type of the Pub/Sub channel.
LastActivitylongThe timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActivebooleanIndicates whether the channel is active or not.
IncomingPubSubStatsThe statistics related to incoming messages for this channel.
OutgoingPubSubStatsThe statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listEventsChannels() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-channel-lister"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := eventsClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Event / Subscribe Message

Sends a message to an Events channel.

Send Request: Event

NameTypeDescriptionDefault ValueMandatory
IdStringUnique identifier for the event message.NoneNo
ChannelStringThe channel to which the event message is sent.NoneYes
MetadataStringMetadata associated with the event message.NoneNo
Bodybyte[]Body of the event message in bytes.Empty byte arrayNo
TagsMap<String, String>Tags associated with the event message as key-value pairs.Empty MapNo

Send Response

NameTypeDescription
ErrerrorError message if any

Subscribe Request: EventsSubscription

NameTypeDescriptionDefault ValueMandatory
ChannelStringThe channel to subscribe to.NoneYes
GroupStringThe group to subscribe with.NoneNo

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendSubscribeEvents() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsClient, err := kubemq.NewEventsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-send-subscribe"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subReq := &kubemq.EventsSubscription{
		Channel:  "events-channel",
		Group:    "",
		ClientId: "",
	}
	err = eventsClient.Subscribe(ctx, subReq, func(msg *kubemq.Event, err error) {
		log.Println(msg.String())
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(300 * time.Second)

	err = eventsClient.Send(ctx, &kubemq.Event{
		Channel:  "events-channel",
		Metadata: "some-metadata",
		Body:     []byte("hello kubemq - sending event"),
	})

	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
}

PubSub EventsStore Operations

Create Channel

Create a new Events Store channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
CtxcontextThe context for the request.NoneYes
ChannelNameStringName of the channel you want to createNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)
func createEventsStoreChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-channel-creator"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsStoreClient.Create(ctx, "events-store-channel"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Events Store channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
ChannelNameStringName of the channel you want to deleteNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteEventsStoreChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-channel-delete"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := eventsStoreClient.Delete(ctx, "events-store-channel"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Events channels.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
SearchQueryStringSearch query to filter channels (optional)NoneNo

Response

Returns a list where each PubSubChannel has the following attributes:

NameTypeDescription
NameStringThe name of the Pub/Sub channel.
TypeStringThe type of the Pub/Sub channel.
LastActivitylongThe timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActivebooleanIndicates whether the channel is active or not.
IncomingPubSubStatsThe statistics related to incoming messages for this channel.
OutgoingPubSubStatsThe statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listEventsStoreChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-channel-lister"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := eventsStoreClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Event / Subscribe Message

Sends a message to an Events channel.

Send Request: Event

NameTypeDescriptionDefault ValueMandatory
IdStringUnique identifier for the event message.NoneNo
ChannelStringThe channel to which the event message is sent.NoneYes
MetadataStringMetadata associated with the event message.NoneNo
Bodybyte[]Body of the event message in bytes.Empty byte arrayNo
TagsMap<String, String>Tags associated with the event message as key-value pairs.Empty MapNo

Send Response

NameTypeDescription
ErrerrorError message if any

Subscribe Request: EventsStoreSubscription

NameTypeDescriptionDefault ValueMandatory
ChannelStringThe channel to subscribe to.NoneYes
GroupStringThe group to subscribe with.NoneNo
SubscriptionTypeEventsStoreSubscriptionThe SubscriptionNoneYes

EventsStoreType Options

TypeValueDescription
StartNewOnly1Start storing events from the point when the subscription is made
StartFromFirst2Start storing events from the first event available
StartFromLast3Start storing events from the last event available
StartAtSequence4Start storing events from a specific sequence number
StartAtTime5Start storing events from a specific point in time
StartAtTimeDelta6Start storing events from a specific time delta in seconds

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendSubscribeEventsStore() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("events-store-send-subscribe"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := eventsStoreClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subReq := &kubemq.EventsStoreSubscription{
		Channel:          "events-store-channel",
		Group:            "",
		ClientId:         "",
		SubscriptionType: kubemq.StartFromFirstEvent(),
	}
	err = eventsStoreClient.Subscribe(ctx, subReq, func(msg *kubemq.EventStoreReceive, err error) {
		log.Println(msg.String())
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)

	result, err := eventsStoreClient.Send(ctx, &kubemq.EventStore{
		Channel:  "events-store-channel",
		Metadata: "some-metadata",
		Body:     []byte("hello kubemq - sending event store"),
	})
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	time.Sleep(1 * time.Second)

}

Commands & Queries – Commands Operations

Create Channel

Create a new Command channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
CtxcontextThe context for the request.NoneYes
ChannelNameStringName of the channel you want to createNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func createCommandsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := commandsClient.Create(ctx, "commands.A"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Command channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
ChannelNameStringName of the channel you want to deleteNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteCommandsChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := commandsClient.Delete(ctx, "commands.A"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Command channels.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
SearchQueryStringSearch query to filter channels (optional)NoneNo

Response

Returns a list where each CQChannel has the following attributes:

NameTypeDescription
NameStringThe name of the Pub/Sub channel.
TypeStringThe type of the Pub/Sub channel.
LastActivitylongThe timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActivebooleanIndicates whether the channel is active or not.
IncomingPubSubStatsThe statistics related to incoming messages for this channel.
OutgoingPubSubStatsThe statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listCommandsChannels() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := commandsClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Command / Receive Request

Sends a command request to a Command channel.

Send Request: CommandMessage

NameTypeDescriptionDefault ValueMandatory
IdStringThe ID of the command message.NoneYes
ChannelStringThe channel through which the command message will be sent.NoneYes
MetadataStringAdditional metadata associated with the command message.NoneNo
Bodybyte[]The body of the command message as bytes.Empty byte arrayNo
TagsMap<String, String>A dictionary of key-value pairs representing tags associated with the command message.Empty MapNo
TimeoutDurationThe maximum time duration for waiting to response.NoneYes

Send Response: CommandResponseMessage

NameTypeDescription
CommandIdStringCommand Id
ResponseClientIdStringThe client ID associated with the command response.
ExecutedbooleanIndicates if the command has been executed.
ExecutedAttimeThe timestamp when the command response was created.
ErrorStringThe error message if there was an error.

Receive Request: CommandsSubscription

NameTypeDescriptionDefault ValueMandatory
ChannelStringThe channel for the subscription.NoneYes
GroupStringThe group associated with the subscription.NoneNo

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendReceiveCommands() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	commandsClient, err := kubemq.NewCommandsClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("sendReceiveCommands"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := commandsClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subRequest := &kubemq.CommandsSubscription{
		Channel:  "commands",
		ClientId: "",
		Group:    "",
	}
	log.Println("subscribing to commands")
	err = commandsClient.Subscribe(ctx, subRequest, func(cmd *kubemq.CommandReceive, err error) {
		log.Println(cmd.String())
		resp := &kubemq.Response{
			RequestId:  cmd.Id,
			ResponseTo: cmd.ResponseTo,
			Metadata:   "some-metadata",
			ExecutedAt: time.Now(),
		}
		if err := commandsClient.Response(ctx, resp); err != nil {
			log.Fatal(err)
		}
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
	log.Println("sending command")
	result, err := commandsClient.Send(ctx, kubemq.NewCommand().
		SetChannel("commands").
		SetMetadata("some-metadata").
		SetBody([]byte("hello kubemq - sending command")).
		SetTimeout(time.Duration(10)*time.Second))
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
}

Commands & Queries – Queries Operations

Create Channel

Create a new Query channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
CtxcontextThe context for the request.NoneYes
ChannelNameStringName of the channel you want to createNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)


func createQueriesChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queriesClient.Create(ctx, "queries.A"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Query channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
ChannelNameStringName of the channel you want to deleteNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func deleteQueriesChannel() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queriesClient.Delete(ctx, "queries.A"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Query channels.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
SearchQueryStringSearch query to filter channels (optional)NoneNo

Response

Returns a list where each CQChannel has the following attributes:

NameTypeDescription
NameStringThe name of the Pub/Sub channel.
TypeStringThe type of the Pub/Sub channel.
LastActivitylongThe timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActivebooleanIndicates whether the channel is active or not.
IncomingPubSubStatsThe statistics related to incoming messages for this channel.
OutgoingPubSubStatsThe statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
)

func listQueriesChannels() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := queriesClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send Query / Receive Request

Sends a query request to a Query channel.

Send Request: QueryMessage

NameTypeDescriptionDefault ValueMandatory
IdStringThe ID of the query message.NoneYes
ChannelStringThe channel through which the query message will be sent.NoneYes
MetadataStringAdditional metadata associated with the query message.NoneNo
Bodybyte[]The body of the query message as bytes.Empty byte arrayNo
TagsMap<String, String>A dictionary of key-value pairs representing tags associated with the query message.Empty MapNo
TimeoutDurationThe maximum time duration for waiting to response.NoneYes

Send Response: QueryResponse

NameTypeDescription
QueryIdStringQuery Id
ResponseClientIdStringThe client ID associated with the query response.
ExecutedbooleanIndicates if the query has been executed.
MetadataStringAdditional metadata associated with the query response message.
Bodybyte[]The body of the query response message as bytes.
TagsMap<String, String>A dictionary of key-value pairs representing tags associated with the query response message.
ExecutedAttimeThe timestamp when the query response was created.
ErrorStringThe error message if there was an error.

Receive Request: QuerySubscription

NameTypeDescriptionDefault ValueMandatory
ChannelStringThe channel for the subscription.NoneYes
GroupStringThe group associated with the subscription.NoneNo

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go"
	"log"
	"time"
)

func sendReceiveQueries() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queriesClient, err := kubemq.NewQueriesClient(ctx,
		kubemq.WithAddress("localhost", 50000),
		kubemq.WithClientId("sendReceiveQueries"))

	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queriesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	subRequest := &kubemq.QueriesSubscription{
		Channel:  "queries",
		ClientId: "",
		Group:    "",
	}
	log.Println("subscribing to queries")
	err = queriesClient.Subscribe(ctx, subRequest, func(query *kubemq.QueryReceive, err error) {
		log.Println(query.String())
		resp := &kubemq.Response{
			RequestId:  query.Id,
			ResponseTo: query.ResponseTo,
			Metadata:   "some-metadata",
			ExecutedAt: time.Now(),
			Body:       []byte("hello kubemq - sending query response"),
		}
		if err := queriesClient.Response(ctx, resp); err != nil {
			log.Fatal(err)
		}
	})
	if err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
	log.Println("sending query")
	result, err := queriesClient.Send(ctx, kubemq.NewQuery().
		SetChannel("queries").
		SetMetadata("some-metadata").
		SetBody([]byte("hello kubemq - sending query")).
		SetTimeout(time.Duration(10)*time.Second))
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
}

Queues Operations

Create Channel

Create a new Queue channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
CtxcontextThe context for the request.NoneYes
ChannelNameStringName of the channel you want to createNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queuesClient.Create(ctx, "queues.A"); err != nil {
		log.Fatal(err)
	}
}

Delete Channel

Delete an existing Queue channel.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
ChannelNameStringName of the channel you want to deleteNoneYes

Response

NameTypeDescription
ErrerrorError message if any

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	if err := queuesClient.Delete(ctx, "queues.A"); err != nil {
		log.Fatal(err)
	}
}

List Channels

Retrieve a list of Queue channels.

Request Parameters

NameTypeDescriptionDefault ValueMandatory
SearchQueryStringSearch query to filter channels (optional)NoneNo

Response

Returns a list where each QueuesChannel has the following attributes:

NameTypeDescription
NameStringThe name of the Pub/Sub channel.
TypeStringThe type of the Pub/Sub channel.
LastActivitylongThe timestamp of the last activity on the channel, represented in milliseconds since epoch.
IsActivebooleanIndicates whether the channel is active or not.
IncomingPubSubStatsThe statistics related to incoming messages for this channel.
OutgoingPubSubStatsThe statistics related to outgoing messages for this channel.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	channels, err := queuesClient.List(ctx, "")
	if err != nil {
		log.Fatal(err)
	}
	for _, channel := range channels {
		log.Println(channel)
	}
}

Send / Receive Queue Messages

Send and receive messages from a Queue channel.

Send Request: QueueMessage

NameTypeDescriptionDefault ValueMandatory
IdStringThe unique identifier for the message.NoneNo
ChannelStringThe channel of the message.NoneYes
MetadataStringThe metadata associated with the message.NoneNo
Bodybyte[]The body of the message.new byte[0]No
TagsMap<String, String>The tags associated with the message.new HashMap<>()No
PolicyDelaySecondsintThe delay in seconds before the message becomes available in the queue.NoneNo
PolicyExpirationSecondsintThe expiration time in seconds for the message.NoneNo
PolicyMaxReceiveCountintThe number of receive attempts allowed for the message before it is moved to the dead letter queue.NoneNo
PolicyMaxReceiveQueueStringThe dead letter queue where the message will be moved after reaching the maximum receive attempts.NoneNo

Send Response: SendResult

NameTypeDescription
IdStringThe unique identifier of the message.
SentAtLocalDateTimeThe timestamp when the message was sent.
ExpiredAtLocalDateTimeThe timestamp when the message will expire.
DelayedToLocalDateTimeThe timestamp when the message will be delivered.
IsErrorbooleanIndicates if there was an error while sending the message.
ErrorStringThe error message if isError is true.

Receive Request: PollRequest

NameTypeDescriptionDefault ValueMandatory
ChannelStringThe channel to poll messages from.NoneYes
MaxItemsintThe maximum number of messages to poll.1No
WaitTimeoutintThe wait timeout in seconds for polling messages.60No
AutoAckbooleanIndicates if messages should be auto-acknowledged.falseNo
VisibilitySecondsintAdd a visibility timeout feature for messages.0No

Response: PollResponse

NameTypeDescription
MessagesList<QueueMessage>The list of received queue messages.
Response: QueueMessage
NameTypeDescription
IdStringThe unique identifier for the message.
ChannelStringThe channel from which the message was received.
MetadataStringMetadata associated with the message.
Bodybyte[]The body of the message in byte array format.
ClientIDStringThe ID of the client that sent the message.
TagsMap<String, String>Key-value pairs representing tags for the message.
TimestampInstantThe timestamp when the message was created.
SequencelongThe sequence number of the message.
ReceiveCountintThe number of times the message has been received.
ReRoutedbooleanIndicates whether the message was rerouted.
ReRoutedFromQueueStringThe name of the queue from which the message was rerouted.
ExpirationAtInstantThe expiration time of the message, if applicable.
DelayedToInstantThe time the message is delayed until, if applicable.

Example

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
	"time"
)

func sendAndReceive() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	msg := queues_stream.NewQueueMessage().
		SetId("message_id").
		SetChannel("sendAndReceive").
		SetMetadata("some-metadata").
		SetBody([]byte("hello world from KubeMQ queue")).
		SetTags(map[string]string{
			"key1": "value1",
			"key2": "value2",
		}).
		SetPolicyDelaySeconds(1).
		SetPolicyExpirationSeconds(10).
		SetPolicyMaxReceiveCount(3).
		SetPolicyMaxReceiveQueue("dlq")
	result, err := queuesClient.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	pollRequest := queues_stream.NewPollRequest().
		SetChannel("sendAndReceive").
		SetMaxItems(1).
		SetWaitTimeout(10).
		SetAutoAck(true)
	msgs, err := queuesClient.Poll(ctx, pollRequest)
	//if err != nil {
	//	log.Fatal(err)
	//}
	//AckAll - Acknowledge all messages
	//if err := msgs.AckAll(); err != nil {
	//	log.Fatal(err)
	//}

	//NackAll - Not Acknowledge all messages
	//if err := msgs.NAckAll(); err != nil {
	//	log.Fatal(err)
	//}

	// RequeueAll - Requeue all messages
	//if err := msgs.ReQueueAll("requeue-queue-channel"); err != nil {
	//	log.Fatal(err)
	//}

	for _, msg := range msgs.Messages {
		log.Println(msg.String())

		// Ack - Acknowledge message
		if err := msg.Ack(); err != nil {
			log.Fatal(err)
		}

		// Nack - Not Acknowledge message
		//if err := msg.NAck(); err != nil {
		//	log.Fatal(err)
		//}

		// Requeue - Requeue message
		//if err := msg.ReQueue("requeue-queue-channel"); err != nil {
		//	log.Fatal(err)
		//}
	}

}

Example with Visibility

package main

import (
	"context"
	"github.com/kubemq-io/kubemq-go/queues_stream"
	"log"
	"time"
)

func sendAndReceiveWithVisibilityExpiration() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	msg := queues_stream.NewQueueMessage().
		SetId("message_id").
		SetChannel("sendAndReceiveWithVisibility").
		SetMetadata("some-metadata").
		SetBody([]byte("hello world from KubeMQ queue - with visibility"))

	result, err := queuesClient.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	pollRequest := queues_stream.NewPollRequest().
		SetChannel("sendAndReceiveWithVisibility").
		SetMaxItems(1).
		SetWaitTimeout(10).
		SetVisibilitySeconds(2)
	msgs, err := queuesClient.Poll(ctx, pollRequest)
	for _, msg := range msgs.Messages {
		log.Println(msg.String())
		log.Println("Received message, waiting 3 seconds before ack")
		time.Sleep(3 * time.Second)
		if err := msg.Ack(); err != nil {
			log.Fatal(err)
		}
	}

}
func sendAndReceiveWithVisibilityExtension() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
		queues_stream.WithAddress("localhost", 50000),
		queues_stream.WithClientId("example"))
	if err != nil {
		log.Fatal(err)
	}
	defer func() {
		err := queuesClient.Close()
		if err != nil {
			log.Fatal(err)
		}
	}()
	msg := queues_stream.NewQueueMessage().
		SetId("message_id").
		SetChannel("sendAndReceiveWithVisibility").
		SetMetadata("some-metadata").
		SetBody([]byte("hello world from KubeMQ queue - with visibility"))

	result, err := queuesClient.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
	log.Println(result)
	pollRequest := queues_stream.NewPollRequest().
		SetChannel("sendAndReceiveWithVisibility").
		SetMaxItems(1).
		SetWaitTimeout(10).
		SetVisibilitySeconds(2)
	msgs, err := queuesClient.Poll(ctx, pollRequest)
	for _, msg := range msgs.Messages {
		log.Println(msg.String())
		log.Println("Received message, waiting 1 seconds before ack")
		time.Sleep(1 * time.Second)
		log.Println("Extending visibility for 3 seconds and waiting 2 seconds before ack")
		if err := msg.ExtendVisibility(3); err != nil {
			log.Fatal(err)
		}
		time.Sleep(2 * time.Second)
		if err := msg.Ack(); err != nil {
			log.Fatal(err)
		}
	}
}