Skip to content

Commit

Permalink
v1.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
liornabat committed Apr 28, 2024
1 parent 7f95631 commit 9621d85
Show file tree
Hide file tree
Showing 56 changed files with 973 additions and 1,716 deletions.
24 changes: 18 additions & 6 deletions commands_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubemq
import (
"context"
"fmt"
"github.com/kubemq-io/kubemq-go/common"
)

type CommandsClient struct {
Expand Down Expand Up @@ -42,21 +43,21 @@ func NewCommandsClient(ctx context.Context, op ...Option) (*CommandsClient, erro
}

func (c *CommandsClient) Send(ctx context.Context, request *Command) (*CommandResponse, error) {
if err:=c.isClientReady();err!=nil{
return nil,err
if err := c.isClientReady(); err != nil {
return nil, err
}
request.transport = c.client.transport
return c.client.SetCommand(request).Send(ctx)
}
func (c *CommandsClient) Response(ctx context.Context, response *Response) error {
if err:=c.isClientReady();err!=nil{
if err := c.isClientReady(); err != nil {
return err
}
response.transport = c.client.transport
return c.client.SetResponse(response).Send(ctx)
}
func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscription, onCommandReceive func(cmd *CommandReceive, err error)) error {
if err:=c.isClientReady();err!=nil{
if err := c.isClientReady(); err != nil {
return err
}
if onCommandReceive == nil {
Expand Down Expand Up @@ -85,13 +86,24 @@ func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscri
return nil
}

func (c *CommandsClient) Create(ctx context.Context, channel string) error {
return CreateChannel(ctx, c.client, c.client.opts.clientId, channel, "commands")
}

func (c *CommandsClient) Delete(ctx context.Context, channel string) error {
return DeleteChannel(ctx, c.client, c.client.opts.clientId, channel, "commands")
}

func (c *CommandsClient) List(ctx context.Context, search string) ([]*common.CQChannel, error) {
return ListCQChannels(ctx, c.client, c.client.opts.clientId, "commands", search)
}

func (c *CommandsClient) Close() error {
return c.client.Close()
}


func (c *CommandsClient) isClientReady() error {
if c.client==nil {
if c.client == nil {
return fmt.Errorf("client is not initialized")
}
return nil
Expand Down
140 changes: 140 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package kubemq

import (
"context"
"encoding/json"
"fmt"
"github.com/kubemq-io/kubemq-go/common"
"github.com/kubemq-io/kubemq-go/pkg/uuid"
"time"
)

const requestChannel = "kubemq.cluster.internal.requests"

func CreateChannel(ctx context.Context, client *Client, clientId string, channel string, channelType string) error {
request := NewQuery().
SetChannel(requestChannel).
SetId(uuid.New()).
SetClientId(clientId).
SetMetadata("create-channel").
SetTags(map[string]string{
"channel_type": channelType,
"channel": channel,
"client_id": clientId,
}).
SetTimeout(time.Second * 10)
resp, err := client.SetQuery(request).Send(ctx)
if err != nil {
return fmt.Errorf("error sending create channel request: %s", err.Error())
}
if resp.Error != "" {
return fmt.Errorf("error creating channel: %s", resp.Error)
}

return nil
}

func DeleteChannel(ctx context.Context, client *Client, clientId string, channel string, channelType string) error {
request := NewQuery().
SetChannel(requestChannel).
SetId(uuid.New()).
SetClientId(clientId).
SetMetadata("delete-channel").
SetTags(map[string]string{
"channel_type": channelType,
"channel": channel,
"client_id": clientId,
}).
SetTimeout(time.Second * 10)
resp, err := client.SetQuery(request).Send(ctx)
if err != nil {
return fmt.Errorf("error sending delete channel request: %s", err.Error())
}
if resp.Error != "" {
return fmt.Errorf("error deleting channel: %s", resp.Error)
}
return nil
}

func listChannels(ctx context.Context, client *Client, clientId string, channelType string, search string) ([]byte, error) {
request := NewQuery().
SetChannel(requestChannel).
SetId(uuid.New()).
SetClientId(clientId).
SetMetadata("list-channels").
SetTags(map[string]string{
"channel_type": channelType,
"client_id": clientId,
"search": search,
}).
SetTimeout(time.Second * 10)
resp, err := client.SetQuery(request).Send(ctx)
if err != nil {
return nil, fmt.Errorf("error sending list channels request: %s", err.Error())
}
if resp.Error != "" {
return nil, fmt.Errorf("error listing channels: %s", resp.Error)
}
return resp.Body, nil
}
func ListQueuesChannels(ctx context.Context, client *Client, clientId string, search string) ([]*common.QueuesChannel, error) {
data, err := listChannels(ctx, client, clientId, "queues", search)
if err != nil {
return nil, err

}
return DecodeQueuesChannelList(data)
}

func ListPubSubChannels(ctx context.Context, client *Client, clientId string, channelType string, search string) ([]*common.PubSubChannel, error) {
data, err := listChannels(ctx, client, clientId, channelType, search)
if err != nil {
return nil, err

}
return DecodePubSubChannelList(data)
}

func ListCQChannels(ctx context.Context, client *Client, clientId string, channelType string, search string) ([]*common.CQChannel, error) {
data, err := listChannels(ctx, client, clientId, channelType, search)
if err != nil {
return nil, err
}
return DecodeCQChannelList(data)
}

func DecodePubSubChannelList(dataBytes []byte) ([]*common.PubSubChannel, error) {
var channelsData []*common.PubSubChannel
if dataBytes == nil {
return nil, nil
}
err := json.Unmarshal(dataBytes, &channelsData)
if err != nil {
return nil, err
}
return channelsData, nil
}

func DecodeQueuesChannelList(dataBytes []byte) ([]*common.QueuesChannel, error) {
var channelsData []*common.QueuesChannel
if dataBytes == nil {
return nil, nil
}
err := json.Unmarshal(dataBytes, &channelsData)
if err != nil {
return nil, err
}
return channelsData, nil
}

func DecodeCQChannelList(dataBytes []byte) ([]*common.CQChannel, error) {
var channelsData []*common.CQChannel
if dataBytes == nil {
return nil, nil
}
err := json.Unmarshal(dataBytes, &channelsData)
if err != nil {
return nil, err
}
return channelsData, nil
}
75 changes: 75 additions & 0 deletions common/channel_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package common

import (
"fmt"
)

type QueuesStats struct {
Messages int `json:"messages"`
Volume int `json:"volume"`
Waiting int `json:"waiting"`
Expired int `json:"expired"`
Delayed int `json:"delayed"`
}

func (qs *QueuesStats) String() string {
return fmt.Sprintf("Stats: messages=%d, volume=%d, waiting=%d, expired=%d, delayed=%d", qs.Messages, qs.Volume, qs.Waiting, qs.Expired, qs.Delayed)
}

type QueuesChannel struct {
Name string `json:"name"`
Type string `json:"type"`
LastActivity int `json:"lastActivity"`
IsActive bool `json:"isActive"`
Incoming QueuesStats `json:"incoming"`
Outgoing QueuesStats `json:"outgoing"`
}

func (qc *QueuesChannel) String() string {
return fmt.Sprintf("Channel: name=%s, type=%s, last_activity=%d, is_active=%t, incoming=%v, outgoing=%v", qc.Name, qc.Type, qc.LastActivity, qc.IsActive, qc.Incoming, qc.Outgoing)
}

type PubSubStats struct {
Messages int `json:"messages"`
Volume int `json:"volume"`
}

func (ps *PubSubStats) String() string {
return fmt.Sprintf("Stats: messages=%d, volume=%d", ps.Messages, ps.Volume)
}

type PubSubChannel struct {
Name string `json:"name"`
Type string `json:"type"`
LastActivity int `json:"lastActivity"`
IsActive bool `json:"isActive"`
Incoming PubSubStats `json:"incoming"`
Outgoing PubSubStats `json:"outgoing"`
}

func (pc *PubSubChannel) String() string {
return fmt.Sprintf("Channel: name=%s, type=%s, last_activity=%d, is_active=%t, incoming=%v, outgoing=%v", pc.Name, pc.Type, pc.LastActivity, pc.IsActive, pc.Incoming, pc.Outgoing)
}

type CQStats struct {
Messages int `json:"messages"`
Volume int `json:"volume"`
Responses int `json:"responses"`
}

func (cs *CQStats) String() string {
return fmt.Sprintf("Stats: messages=%d, volume=%d, responses=%d", cs.Messages, cs.Volume, cs.Responses)
}

type CQChannel struct {
Name string `json:"name"`
Type string `json:"type"`
LastActivity int `json:"lastActivity"`
IsActive bool `json:"isActive"`
Incoming CQStats `json:"incoming"`
Outgoing CQStats `json:"outgoing"`
}

func (cc *CQChannel) String() string {
return fmt.Sprintf("Channel: name=%s, type=%s, last_activity=%d, is_active=%t, incoming=%v, outgoing=%v", cc.Name, cc.Type, cc.LastActivity, cc.IsActive, cc.Incoming, cc.Outgoing)
}
22 changes: 17 additions & 5 deletions events_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubemq
import (
"context"
"fmt"
"github.com/kubemq-io/kubemq-go/common"
)

type EventsMessageHandler func(*Event)
Expand Down Expand Up @@ -44,16 +45,16 @@ func NewEventsClient(ctx context.Context, op ...Option) (*EventsClient, error) {
}

func (e *EventsClient) Send(ctx context.Context, message *Event) error {
if err:=e.isClientReady();err!=nil{
if err := e.isClientReady(); err != nil {
return err
}
message.transport = e.client.transport
return e.client.SetEvent(message).Send(ctx)
}

func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (func(msg *Event) error, error) {
if err:=e.isClientReady();err!=nil{
return nil,err
if err := e.isClientReady(); err != nil {
return nil, err
}
if onError == nil {
return nil, fmt.Errorf("events stream error callback function is required")
Expand Down Expand Up @@ -84,7 +85,7 @@ func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (fun
}

func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscription, onEvent func(msg *Event, err error)) error {
if err:=e.isClientReady();err!=nil{
if err := e.isClientReady(); err != nil {
return err
}
if onEvent == nil {
Expand Down Expand Up @@ -113,12 +114,23 @@ func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscriptio
return nil
}

func (e *EventsClient) Create(ctx context.Context, channel string) error {
return CreateChannel(ctx, e.client, e.client.opts.clientId, channel, "events")
}

func (e *EventsClient) Delete(ctx context.Context, channel string) error {
return DeleteChannel(ctx, e.client, e.client.opts.clientId, channel, "events")
}

func (e *EventsClient) List(ctx context.Context, search string) ([]*common.PubSubChannel, error) {
return ListPubSubChannels(ctx, e.client, e.client.opts.clientId, "events", search)
}
func (e *EventsClient) Close() error {
return e.client.Close()
}

func (e *EventsClient) isClientReady() error {
if e.client==nil {
if e.client == nil {
return fmt.Errorf("client is not initialized")
}
return nil
Expand Down
13 changes: 13 additions & 0 deletions events_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubemq
import (
"context"
"fmt"
"github.com/kubemq-io/kubemq-go/common"
)

type EventsStoreClient struct {
Expand Down Expand Up @@ -120,6 +121,18 @@ func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStore
return nil
}

func (es *EventsStoreClient) Create(ctx context.Context, channel string) error {
return CreateChannel(ctx, es.client, es.client.opts.clientId, channel, "events_store")
}

func (es *EventsStoreClient) Delete(ctx context.Context, channel string) error {
return DeleteChannel(ctx, es.client, es.client.opts.clientId, channel, "events_store")
}

func (es *EventsStoreClient) List(ctx context.Context, search string) ([]*common.PubSubChannel, error) {
return ListPubSubChannels(ctx, es.client, es.client.opts.clientId, "events_store", search)
}

func (es *EventsStoreClient) Close() error {
if err := es.isClientReady(); err != nil {
return err
Expand Down
28 changes: 28 additions & 0 deletions examples/pubsub/events-store/create/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"context"
"github.com/kubemq-io/kubemq-go"
"log"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventsStoreClient, err := kubemq.NewEventsStoreClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId("example"),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}
defer func() {
err := eventsStoreClient.Close()
if err != nil {
log.Fatal(err)
}
}()
if err := eventsStoreClient.Create(ctx, "events-store.single"); err != nil {
log.Fatal(err)
}
}
Loading

0 comments on commit 9621d85

Please sign in to comment.