Skip to content

Commit

Permalink
1.cache组件新增list支持,2.代码和注释优化
Browse files Browse the repository at this point in the history
  • Loading branch information
keepchen committed Dec 29, 2023
1 parent 47a7766 commit 90c4c88
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 28 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.3.1
github.com/jinzhu/configor v1.2.1
github.com/keepchen/message-queue v0.0.5
github.com/nacos-group/nacos-sdk-go/v2 v2.1.2
github.com/nats-io/nats.go v1.26.0
github.com/pelletier/go-toml/v2 v2.0.8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/keepchen/message-queue v0.0.5 h1:6r1bwn9P/c3DBN2Q9spWl3VwwCkyWPE6BpHfek7lKaM=
github.com/keepchen/message-queue v0.0.5/go.mod h1:/k0O82UGWVfLEVRAXNgiNzYaw2NJ0C9qIVAhBSgewow=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
Expand Down
3 changes: 3 additions & 0 deletions http/middleware/requestentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func RequestEntry() gin.HandlerFunc {
spanId string
)
requestIdInHeader := c.Request.Header.Get("requestId")
if len(requestIdInHeader) == 0 {
requestIdInHeader = c.Request.Header.Get("X-Request-Id")
}
if len(requestIdInHeader) > 0 {
requestId = requestIdInHeader
spanId = uuid.New().String()
Expand Down
17 changes: 11 additions & 6 deletions lib/cache/local.go → lib/cache/localkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"github.com/keepchen/go-sail/v3/utils"
)

type value struct {
type localCacheValue struct {
expiredAt int64
content interface{}
}

type localCache struct {
mux *sync.Mutex
maps map[string]*value
maps map[string]*localCacheValue
}

var lc *localCache
Expand All @@ -23,7 +23,7 @@ func init() {
(&sync.Once{}).Do(func() {
lc = &localCache{
mux: &sync.Mutex{},
maps: make(map[string]*value),
maps: make(map[string]*localCacheValue),
}
})
}
Expand All @@ -39,7 +39,7 @@ func Put(key string, val interface{}, expiredTimeDuration ...time.Duration) bool
expiredAt = utils.NewTimeWithTimeZone().Now().Add(time.Hour).Unix()
}
lc.mux.Lock()
lc.maps[key] = &value{expiredAt: expiredAt, content: val}
lc.maps[key] = &localCacheValue{expiredAt: expiredAt, content: val}
lc.mux.Unlock()

return true
Expand Down Expand Up @@ -77,11 +77,16 @@ func Forget(key string) bool {

// Expire 为key设置过期时间
func Expire(key string, expiredTimeDuration time.Duration) bool {
var optOk bool

expiredAt := utils.NewTimeWithTimeZone().Now().Add(expiredTimeDuration).Unix()

lc.mux.Lock()
lc.maps[key].expiredAt = expiredAt
if _, ok := lc.maps[key]; ok {
lc.maps[key].expiredAt = expiredAt
optOk = true
}
lc.mux.Unlock()

return true
return optOk
}
54 changes: 54 additions & 0 deletions lib/cache/localkv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package cache

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

var (
key = "test-local-cache"
values = []string{"a", "b", "c", "d"}
)

func TestPut(t *testing.T) {
for _, v := range values {
ok := Put(key, v)
assert.Equal(t, true, ok)
}
}

func TestGet(t *testing.T) {
for _, v := range values {
ok := Put(key, v)
assert.Equal(t, true, ok)
getV, ret := Get(key)
assert.Equal(t, v, getV.(string))
assert.Equal(t, 0, ret)
}
}

func TestForget(t *testing.T) {
for _, v := range values {
ok := Put(key, v)
assert.Equal(t, true, ok)

ok = Forget(key)
assert.Equal(t, true, ok)

getV, ret := Get(key)
assert.Equal(t, nil, getV)
assert.Equal(t, -2, ret)
}
}

func TestExpire(t *testing.T) {
for _, v := range values {
ok := Put(key, v)
assert.Equal(t, true, ok)

ok = Expire(key, time.Minute)
assert.Equal(t, true, ok)
}
}
24 changes: 24 additions & 0 deletions lib/cache/locallist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package cache

import "github.com/keepchen/message-queue/queue"

var listInstance *queue.Instance

// InitList 初始化列表
func InitList() {
listInstance = queue.GetDBInstance("<localList>")
listInstance.SetDebugMode(false)
}

// GetListInstance 获取列表实例
func GetListInstance() *queue.Instance {
return listInstance
}

// NewList 新建列表
func NewList(listName string) *queue.Instance {
instance := queue.GetDBInstance(listName)
instance.SetDebugMode(false)

return instance
}
42 changes: 22 additions & 20 deletions lib/email/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,30 @@ func (p *Pool) Mount(index int, envelope *Envelope) {

// Emit 启动队列
func (p *Pool) Emit() {
for index, worker := range p.workers {
go func(index int, wk chan *Envelope) {
for {
select {
case ep := <-wk:
err := p.send(ep)
if err != nil {
log.Println("[GO-SAIL] <Email> send failure via worker, error:", err)
}
//处理上层回调函数
if ep.Callback != nil {
ep.Callback(ep, err)
}
p.wg.Done()
if p.throttle > 0 {
time.Sleep(p.throttle)
}
case <-p.exit:
break
handler := func(index int, wk chan *Envelope) {
LOOP:
for {
select {
case ep := <-wk:
err := p.send(ep)
if err != nil {
log.Println("[GO-SAIL] <Email> send failure via worker, error:", err)
}
//处理上层回调函数
if ep.Callback != nil {
ep.Callback(ep, err)
}
p.wg.Done()
if p.throttle > 0 {
time.Sleep(p.throttle)
}
case <-p.exit:
break LOOP
}
}(index, worker)
}
}
for index, worker := range p.workers {
go handler(index, worker)
}
}

Expand Down
2 changes: 0 additions & 2 deletions lib/nacos/namingclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ func RegisterService(groupName, serviceName string, ip string, port uint64, meta
// @param ip 访问ip地址,如果为空,则使用 utils.GetLocalIP 自动获取
//
// @param port 监听的端口
//
// @param metadata 元数据信息
func UnregisterService(groupName, serviceName string, ip string, port uint64) (bool, error) {
var param vo.DeregisterInstanceParam
param.Ip = ip
Expand Down

0 comments on commit 90c4c88

Please sign in to comment.