Awesome
KubeMQ Go SDK
The KubeMQ SDK for Go enables Go developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.
Table of Contents
<!-- TOC -->
<!-- TOC -->
Prerequisites
- Go SDK 1.17 higher
- KubeMQ server running locally or accessible over the network
Installation
go get github.com/kubemq-io/kubemq-go
Running Examples
The examples are standalone projects that showcase the usage of the SDK. To run the examples, ensure you have a running instance of KubeMQ.
SDK Overview
The SDK implements all communication patterns available through the KubeMQ server:
- PubSub
- Commands & Queries (CQ)
- Queues
PubSub Events Operations
Create Channel
Create a new Events channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createEventsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-channel-creator"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsClient.Create(ctx, "events-channel"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Events channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteEventsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-channel-delete"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsClient.Delete(ctx, "events-channel"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Events channels.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Response
Returns a list where each PubSubChannel
has the following attributes:
Name | Type | Description |
---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listEventsChannels() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-channel-lister"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := eventsClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send Event / Subscribe Message
Sends a message to an Events channel.
Send Request: Event
Name | Type | Description | Default Value | Mandatory |
---|
Id | String | Unique identifier for the event message. | None | No |
Channel | String | The channel to which the event message is sent. | None | Yes |
Metadata | String | Metadata associated with the event message. | None | No |
Body | byte[] | Body of the event message in bytes. | Empty byte array | No |
Tags | Map<String, String> | Tags associated with the event message as key-value pairs. | Empty Map | No |
Send Response
Name | Type | Description |
---|
Err | error | Error message if any |
Subscribe Request: EventsSubscription
Name | Type | Description | Default Value | Mandatory |
---|
Channel | String | The channel to subscribe to. | None | Yes |
Group | String | The group to subscribe with. | None | No |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendSubscribeEvents() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsClient, err := kubemq.NewEventsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-send-subscribe"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subReq := &kubemq.EventsSubscription{
Channel: "events-channel",
Group: "",
ClientId: "",
}
err = eventsClient.Subscribe(ctx, subReq, func(msg *kubemq.Event, err error) {
log.Println(msg.String())
})
if err != nil {
log.Fatal(err)
}
time.Sleep(300 * time.Second)
err = eventsClient.Send(ctx, &kubemq.Event{
Channel: "events-channel",
Metadata: "some-metadata",
Body: []byte("hello kubemq - sending event"),
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
}
PubSub EventsStore Operations
Create Channel
Create a new Events Store channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createEventsStoreChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-channel-creator"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsStoreClient.Create(ctx, "events-store-channel"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Events Store channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteEventsStoreChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-channel-delete"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsStoreClient.Delete(ctx, "events-store-channel"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Events channels.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Response
Returns a list where each PubSubChannel
has the following attributes:
Name | Type | Description |
---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listEventsStoreChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-channel-lister"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := eventsStoreClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send Event / Subscribe Message
Sends a message to an Events channel.
Send Request: Event
Name | Type | Description | Default Value | Mandatory |
---|
Id | String | Unique identifier for the event message. | None | No |
Channel | String | The channel to which the event message is sent. | None | Yes |
Metadata | String | Metadata associated with the event message. | None | No |
Body | byte[] | Body of the event message in bytes. | Empty byte array | No |
Tags | Map<String, String> | Tags associated with the event message as key-value pairs. | Empty Map | No |
Send Response
Name | Type | Description |
---|
Err | error | Error message if any |
Subscribe Request: EventsStoreSubscription
Name | Type | Description | Default Value | Mandatory |
---|
Channel | String | The channel to subscribe to. | None | Yes |
Group | String | The group to subscribe with. | None | No |
SubscriptionType | EventsStoreSubscription | The Subscription | None | Yes |
EventsStoreType Options
Type | Value | Description |
---|
StartNewOnly | 1 | Start storing events from the point when the subscription is made |
StartFromFirst | 2 | Start storing events from the first event available |
StartFromLast | 3 | Start storing events from the last event available |
StartAtSequence | 4 | Start storing events from a specific sequence number |
StartAtTime | 5 | Start storing events from a specific point in time |
StartAtTimeDelta | 6 | Start storing events from a specific time delta in seconds |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendSubscribeEventsStore() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("events-store-send-subscribe"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subReq := &kubemq.EventsStoreSubscription{
Channel: "events-store-channel",
Group: "",
ClientId: "",
SubscriptionType: kubemq.StartFromFirstEvent(),
}
err = eventsStoreClient.Subscribe(ctx, subReq, func(msg *kubemq.EventStoreReceive, err error) {
log.Println(msg.String())
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
result, err := eventsStoreClient.Send(ctx, &kubemq.EventStore{
Channel: "events-store-channel",
Metadata: "some-metadata",
Body: []byte("hello kubemq - sending event store"),
})
if err != nil {
log.Fatal(err)
}
log.Println(result)
time.Sleep(1 * time.Second)
}
Commands & Queries – Commands Operations
Create Channel
Create a new Command channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createCommandsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := commandsClient.Create(ctx, "commands.A"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Command channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteCommandsChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := commandsClient.Delete(ctx, "commands.A"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Command channels.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Response
Returns a list where each CQChannel
has the following attributes:
Name | Type | Description |
---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listCommandsChannels() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := commandsClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send Command / Receive Request
Sends a command request to a Command channel.
Send Request: CommandMessage
Name | Type | Description | Default Value | Mandatory |
---|
Id | String | The ID of the command message. | None | Yes |
Channel | String | The channel through which the command message will be sent. | None | Yes |
Metadata | String | Additional metadata associated with the command message. | None | No |
Body | byte[] | The body of the command message as bytes. | Empty byte array | No |
Tags | Map<String, String> | A dictionary of key-value pairs representing tags associated with the command message. | Empty Map | No |
Timeout | Duration | The maximum time duration for waiting to response. | None | Yes |
Send Response: CommandResponseMessage
Name | Type | Description |
---|
CommandId | String | Command Id |
ResponseClientId | String | The client ID associated with the command response. |
Executed | boolean | Indicates if the command has been executed. |
ExecutedAt | time | The timestamp when the command response was created. |
Error | String | The error message if there was an error. |
Receive Request: CommandsSubscription
Name | Type | Description | Default Value | Mandatory |
---|
Channel | String | The channel for the subscription. | None | Yes |
Group | String | The group associated with the subscription. | None | No |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendReceiveCommands() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
commandsClient, err := kubemq.NewCommandsClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("sendReceiveCommands"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := commandsClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subRequest := &kubemq.CommandsSubscription{
Channel: "commands",
ClientId: "",
Group: "",
}
log.Println("subscribing to commands")
err = commandsClient.Subscribe(ctx, subRequest, func(cmd *kubemq.CommandReceive, err error) {
log.Println(cmd.String())
resp := &kubemq.Response{
RequestId: cmd.Id,
ResponseTo: cmd.ResponseTo,
Metadata: "some-metadata",
ExecutedAt: time.Now(),
}
if err := commandsClient.Response(ctx, resp); err != nil {
log.Fatal(err)
}
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
log.Println("sending command")
result, err := commandsClient.Send(ctx, kubemq.NewCommand().
SetChannel("commands").
SetMetadata("some-metadata").
SetBody([]byte("hello kubemq - sending command")).
SetTimeout(time.Duration(10)*time.Second))
if err != nil {
log.Fatal(err)
}
log.Println(result)
}
Commands & Queries – Queries Operations
Create Channel
Create a new Query channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func createQueriesChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queriesClient.Create(ctx, "queries.A"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Query channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func deleteQueriesChannel() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queriesClient.Delete(ctx, "queries.A"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Query channels.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Response
Returns a list where each CQChannel
has the following attributes:
Name | Type | Description |
---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)
func listQueriesChannels() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := queriesClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send Query / Receive Request
Sends a query request to a Query channel.
Send Request: QueryMessage
Name | Type | Description | Default Value | Mandatory |
---|
Id | String | The ID of the query message. | None | Yes |
Channel | String | The channel through which the query message will be sent. | None | Yes |
Metadata | String | Additional metadata associated with the query message. | None | No |
Body | byte[] | The body of the query message as bytes. | Empty byte array | No |
Tags | Map<String, String> | A dictionary of key-value pairs representing tags associated with the query message. | Empty Map | No |
Timeout | Duration | The maximum time duration for waiting to response. | None | Yes |
Send Response: QueryResponse
Name | Type | Description |
---|
QueryId | String | Query Id |
ResponseClientId | String | The client ID associated with the query response. |
Executed | boolean | Indicates if the query has been executed. |
Metadata | String | Additional metadata associated with the query response message. |
Body | byte[] | The body of the query response message as bytes. |
Tags | Map<String, String> | A dictionary of key-value pairs representing tags associated with the query response message. |
ExecutedAt | time | The timestamp when the query response was created. |
Error | String | The error message if there was an error. |
Receive Request: QuerySubscription
Name | Type | Description | Default Value | Mandatory |
---|
Channel | String | The channel for the subscription. | None | Yes |
Group | String | The group associated with the subscription. | None | No |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
"time"
)
func sendReceiveQueries() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queriesClient, err := kubemq.NewQueriesClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("sendReceiveQueries"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queriesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
subRequest := &kubemq.QueriesSubscription{
Channel: "queries",
ClientId: "",
Group: "",
}
log.Println("subscribing to queries")
err = queriesClient.Subscribe(ctx, subRequest, func(query *kubemq.QueryReceive, err error) {
log.Println(query.String())
resp := &kubemq.Response{
RequestId: query.Id,
ResponseTo: query.ResponseTo,
Metadata: "some-metadata",
ExecutedAt: time.Now(),
Body: []byte("hello kubemq - sending query response"),
}
if err := queriesClient.Response(ctx, resp); err != nil {
log.Fatal(err)
}
})
if err != nil {
log.Fatal(err)
}
time.Sleep(1 * time.Second)
log.Println("sending query")
result, err := queriesClient.Send(ctx, kubemq.NewQuery().
SetChannel("queries").
SetMetadata("some-metadata").
SetBody([]byte("hello kubemq - sending query")).
SetTimeout(time.Duration(10)*time.Second))
if err != nil {
log.Fatal(err)
}
log.Println(result)
}
Queues Operations
Create Channel
Create a new Queue channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
Ctx | context | The context for the request. | None | Yes |
ChannelName | String | Name of the channel you want to create | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queuesClient.Create(ctx, "queues.A"); err != nil {
log.Fatal(err)
}
}
Delete Channel
Delete an existing Queue channel.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
ChannelName | String | Name of the channel you want to delete | None | Yes |
Response
Name | Type | Description |
---|
Err | error | Error message if any |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := queuesClient.Delete(ctx, "queues.A"); err != nil {
log.Fatal(err)
}
}
List Channels
Retrieve a list of Queue channels.
Request Parameters
Name | Type | Description | Default Value | Mandatory |
---|
SearchQuery | String | Search query to filter channels (optional) | None | No |
Response
Returns a list where each QueuesChannel
has the following attributes:
Name | Type | Description |
---|
Name | String | The name of the Pub/Sub channel. |
Type | String | The type of the Pub/Sub channel. |
LastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
IsActive | boolean | Indicates whether the channel is active or not. |
Incoming | PubSubStats | The statistics related to incoming messages for this channel. |
Outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
channels, err := queuesClient.List(ctx, "")
if err != nil {
log.Fatal(err)
}
for _, channel := range channels {
log.Println(channel)
}
}
Send / Receive Queue Messages
Send and receive messages from a Queue channel.
Send Request: QueueMessage
Name | Type | Description | Default Value | Mandatory |
---|
Id | String | The unique identifier for the message. | None | No |
Channel | String | The channel of the message. | None | Yes |
Metadata | String | The metadata associated with the message. | None | No |
Body | byte[] | The body of the message. | new byte[0] | No |
Tags | Map<String, String> | The tags associated with the message. | new HashMap<>() | No |
PolicyDelaySeconds | int | The delay in seconds before the message becomes available in the queue. | None | No |
PolicyExpirationSeconds | int | The expiration time in seconds for the message. | None | No |
PolicyMaxReceiveCount | int | The number of receive attempts allowed for the message before it is moved to the dead letter queue. | None | No |
PolicyMaxReceiveQueue | String | The dead letter queue where the message will be moved after reaching the maximum receive attempts. | None | No |
Send Response: SendResult
Name | Type | Description |
---|
Id | String | The unique identifier of the message. |
SentAt | LocalDateTime | The timestamp when the message was sent. |
ExpiredAt | LocalDateTime | The timestamp when the message will expire. |
DelayedTo | LocalDateTime | The timestamp when the message will be delivered. |
IsError | boolean | Indicates if there was an error while sending the message. |
Error | String | The error message if isError is true. |
Receive Request: PollRequest
Name | Type | Description | Default Value | Mandatory |
---|
Channel | String | The channel to poll messages from. | None | Yes |
MaxItems | int | The maximum number of messages to poll. | 1 | No |
WaitTimeout | int | The wait timeout in seconds for polling messages. | 60 | No |
AutoAck | boolean | Indicates if messages should be auto-acknowledged. | false | No |
VisibilitySeconds | int | Add a visibility timeout feature for messages. | 0 | No |
Response: PollResponse
Name | Type | Description |
---|
Messages | List<QueueMessage> | The list of received queue messages. |
Response: QueueMessage
Name | Type | Description |
---|
Id | String | The unique identifier for the message. |
Channel | String | The channel from which the message was received. |
Metadata | String | Metadata associated with the message. |
Body | byte[] | The body of the message in byte array format. |
ClientID | String | The ID of the client that sent the message. |
Tags | Map<String, String> | Key-value pairs representing tags for the message. |
Timestamp | Instant | The timestamp when the message was created. |
Sequence | long | The sequence number of the message. |
ReceiveCount | int | The number of times the message has been received. |
ReRouted | boolean | Indicates whether the message was rerouted. |
ReRoutedFromQueue | String | The name of the queue from which the message was rerouted. |
ExpirationAt | Instant | The expiration time of the message, if applicable. |
DelayedTo | Instant | The time the message is delayed until, if applicable. |
Example
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
"time"
)
func sendAndReceive() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
msg := queues_stream.NewQueueMessage().
SetId("message_id").
SetChannel("sendAndReceive").
SetMetadata("some-metadata").
SetBody([]byte("hello world from KubeMQ queue")).
SetTags(map[string]string{
"key1": "value1",
"key2": "value2",
}).
SetPolicyDelaySeconds(1).
SetPolicyExpirationSeconds(10).
SetPolicyMaxReceiveCount(3).
SetPolicyMaxReceiveQueue("dlq")
result, err := queuesClient.Send(ctx, msg)
if err != nil {
log.Fatal(err)
}
log.Println(result)
pollRequest := queues_stream.NewPollRequest().
SetChannel("sendAndReceive").
SetMaxItems(1).
SetWaitTimeout(10).
SetAutoAck(true)
msgs, err := queuesClient.Poll(ctx, pollRequest)
//if err != nil {
// log.Fatal(err)
//}
//AckAll - Acknowledge all messages
//if err := msgs.AckAll(); err != nil {
// log.Fatal(err)
//}
//NackAll - Not Acknowledge all messages
//if err := msgs.NAckAll(); err != nil {
// log.Fatal(err)
//}
// RequeueAll - Requeue all messages
//if err := msgs.ReQueueAll("requeue-queue-channel"); err != nil {
// log.Fatal(err)
//}
for _, msg := range msgs.Messages {
log.Println(msg.String())
// Ack - Acknowledge message
if err := msg.Ack(); err != nil {
log.Fatal(err)
}
// Nack - Not Acknowledge message
//if err := msg.NAck(); err != nil {
// log.Fatal(err)
//}
// Requeue - Requeue message
//if err := msg.ReQueue("requeue-queue-channel"); err != nil {
// log.Fatal(err)
//}
}
}
Example with Visibility
package main
import (
"context"
"github.com/kubemq-io/kubemq-go/queues_stream"
"log"
"time"
)
func sendAndReceiveWithVisibilityExpiration() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
msg := queues_stream.NewQueueMessage().
SetId("message_id").
SetChannel("sendAndReceiveWithVisibility").
SetMetadata("some-metadata").
SetBody([]byte("hello world from KubeMQ queue - with visibility"))
result, err := queuesClient.Send(ctx, msg)
if err != nil {
log.Fatal(err)
}
log.Println(result)
pollRequest := queues_stream.NewPollRequest().
SetChannel("sendAndReceiveWithVisibility").
SetMaxItems(1).
SetWaitTimeout(10).
SetVisibilitySeconds(2)
msgs, err := queuesClient.Poll(ctx, pollRequest)
for _, msg := range msgs.Messages {
log.Println(msg.String())
log.Println("Received message, waiting 3 seconds before ack")
time.Sleep(3 * time.Second)
if err := msg.Ack(); err != nil {
log.Fatal(err)
}
}
}
func sendAndReceiveWithVisibilityExtension() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress("localhost", 50000),
queues_stream.WithClientId("example"))
if err != nil {
log.Fatal(err)
}
defer func() {
err := queuesClient.Close()
if err != nil {
log.Fatal(err)
}
}()
msg := queues_stream.NewQueueMessage().
SetId("message_id").
SetChannel("sendAndReceiveWithVisibility").
SetMetadata("some-metadata").
SetBody([]byte("hello world from KubeMQ queue - with visibility"))
result, err := queuesClient.Send(ctx, msg)
if err != nil {
log.Fatal(err)
}
log.Println(result)
pollRequest := queues_stream.NewPollRequest().
SetChannel("sendAndReceiveWithVisibility").
SetMaxItems(1).
SetWaitTimeout(10).
SetVisibilitySeconds(2)
msgs, err := queuesClient.Poll(ctx, pollRequest)
for _, msg := range msgs.Messages {
log.Println(msg.String())
log.Println("Received message, waiting 1 seconds before ack")
time.Sleep(1 * time.Second)
log.Println("Extending visibility for 3 seconds and waiting 2 seconds before ack")
if err := msg.ExtendVisibility(3); err != nil {
log.Fatal(err)
}
time.Sleep(2 * time.Second)
if err := msg.Ack(); err != nil {
log.Fatal(err)
}
}
}