Skip to content

Commit

Permalink
Merge pull request #68 from XDagger/develop
Browse files Browse the repository at this point in the history
fixed bug
  • Loading branch information
swordlet authored Aug 18, 2023
2 parents dafed9f + 3eb406d commit c669b0e
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 134 deletions.
14 changes: 13 additions & 1 deletion server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@ import (
"log"
"os"

// _ "net/http/pprof"

"github.com/swordlet/xmrig2xdag/config"
"github.com/swordlet/xmrig2xdag/logger"
"github.com/swordlet/xmrig2xdag/proxy"
"github.com/swordlet/xmrig2xdag/tcp"
)

// func HelloWorld(w http.ResponseWriter, r *http.Request) {
// fmt.Fprintln(w, "hello world")
// }

var (
version = "2.0.5"
version = "2.0.6"

// cmd line options
configFile *string
Expand Down Expand Up @@ -103,4 +109,10 @@ func main() {
printWelcomeMessage()

<-holdOpen
// http.HandleFunc("/", HelloWorld)

// err := http.ListenAndServe(":8089", nil)
// if err != nil {
// fmt.Println(err)
// }
}
121 changes: 3 additions & 118 deletions server/proxy/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/swordlet/xmrig2xdag/config"
"github.com/swordlet/xmrig2xdag/logger"
"github.com/swordlet/xmrig2xdag/utils"
)

var (
Expand All @@ -19,7 +20,7 @@ type Director struct {
statInterval time.Duration

currentProxyID atomic.Uint64
proxies *SafeMap
proxies *utils.SafeMap

// stat tracking only
lastTotalShares uint64
Expand All @@ -37,7 +38,7 @@ func newDirector() *Director {
d := &Director{
aliveSince: time.Now(),
statInterval: time.Duration(config.Get().StatInterval) * time.Second,
proxies: NewSafeMap(),
proxies: utils.NewSafeMap(),
}
go d.run()

Expand Down Expand Up @@ -127,119 +128,3 @@ func (d *Director) GetStats() *Stats {

return stats
}

// https://github.com/zeromicro/go-zero/blob/master/core/collection/safemap.go
const (
copyThreshold = 1000
maxDeletion = 10000
)

// SafeMap provides a map alternative to avoid memory leak.
// This implementation is not needed until issue below fixed.
// https://github.com/golang/go/issues/20135
type SafeMap struct {
lock sync.RWMutex
deletionOld int
deletionNew int
dirtyOld map[uint64]interface{}
dirtyNew map[uint64]interface{}
}

// NewSafeMap returns a SafeMap.
func NewSafeMap() *SafeMap {
return &SafeMap{
dirtyOld: make(map[uint64]interface{}),
dirtyNew: make(map[uint64]interface{}),
}
}

// Del deletes the value with the given key from m.
func (m *SafeMap) Del(key uint64) {
m.lock.Lock()
if _, ok := m.dirtyOld[key]; ok {
m.dirtyOld[key] = nil
delete(m.dirtyOld, key)
m.deletionOld++
} else if _, ok := m.dirtyNew[key]; ok {
m.dirtyNew[key] = nil
delete(m.dirtyNew, key)
m.deletionNew++
}
if m.deletionOld >= maxDeletion && len(m.dirtyOld) < copyThreshold {
for k, v := range m.dirtyOld {
m.dirtyNew[k] = v
}
m.dirtyOld = m.dirtyNew
m.deletionOld = m.deletionNew
m.dirtyNew = make(map[uint64]interface{})
m.deletionNew = 0
}
if m.deletionNew >= maxDeletion && len(m.dirtyNew) < copyThreshold {
for k, v := range m.dirtyNew {
m.dirtyOld[k] = v
}
m.dirtyNew = make(map[uint64]interface{})
m.deletionNew = 0
}
m.lock.Unlock()
}

// Get gets the value with the given key from m.
func (m *SafeMap) Get(key uint64) (interface{}, bool) {
m.lock.RLock()
defer m.lock.RUnlock()

if val, ok := m.dirtyOld[key]; ok {
return val, true
}

val, ok := m.dirtyNew[key]
return val, ok
}

// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
func (m *SafeMap) Range(f func(key, val interface{}) bool) {
m.lock.RLock()
defer m.lock.RUnlock()

for k, v := range m.dirtyOld {
if !f(k, v) {
return
}
}
for k, v := range m.dirtyNew {
if !f(k, v) {
return
}
}
}

// Set sets the value into m with the given key.
func (m *SafeMap) Set(key uint64, value interface{}) {
m.lock.Lock()
if m.deletionOld <= maxDeletion {
if _, ok := m.dirtyNew[key]; ok {
m.dirtyNew[key] = nil
delete(m.dirtyNew, key)
m.deletionNew++
}
m.dirtyOld[key] = value
} else {
if _, ok := m.dirtyOld[key]; ok {
m.dirtyOld[key] = nil
delete(m.dirtyOld, key)
m.deletionOld++
}
m.dirtyNew[key] = value
}
m.lock.Unlock()
}

// Size returns the size of m.
func (m *SafeMap) Size() int {
m.lock.RLock()
size := len(m.dirtyOld) + len(m.dirtyNew)
m.lock.RUnlock()
return size
}
11 changes: 11 additions & 0 deletions server/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ func (p *Proxy) shutdown(cl int) {
p.SS = nil
p.director = nil
close(p.notify)
close(p.submissions)
}
p.Conn = nil
p.isClosed = true
Expand All @@ -436,6 +437,8 @@ func (p *Proxy) Close() {
p.SS = nil
p.director = nil
close(p.notify)
close(p.submissions)

}
p.Conn = nil
p.isClosed = true
Expand Down Expand Up @@ -535,6 +538,12 @@ func (p *Proxy) handleSubmit(s *share) (err error) {

// Submit sends worker shares to the pool. Safe for concurrent use.
func (p *Proxy) Submit(params map[string]interface{}) (*StatusReply, error) {
p.connMu.Lock()
defer p.connMu.Unlock()

if p.isClosed {
return nil, ErrBadJobID
}
s := newShare(params)

if s.JobID == "" {
Expand Down Expand Up @@ -596,6 +605,8 @@ func (p *Proxy) Remove(w Worker) {
logger.Get().Printf("Proxy[%d] removed", p.ID)
p.Conn = nil
close(p.done)
close(p.notify)
close(p.submissions)
p.isClosed = true
}

Expand Down
16 changes: 10 additions & 6 deletions server/stratum/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

"github.com/powerman/rpc-codec/jsonrpc2"
"github.com/swordlet/xmrig2xdag/utils"
)

// DefaultServerCodec handles xmr stratum+tcp requests and is capabable of sending a notification to
Expand All @@ -25,7 +26,7 @@ type DefaultServerCodec struct {
// the response to find the original request ID.
mutex sync.Mutex // protects seq, pending
seq uint64
pending map[uint64]*json.RawMessage
pending *utils.SafeMap //[uint64]*json.RawMessage
}

type defaultNotification struct {
Expand All @@ -43,7 +44,7 @@ func NewDefaultServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
c: conn,
ctx: context.Background(),
},
pending: make(map[uint64]*json.RawMessage),
pending: utils.NewSafeMap(), //(map[uint64]*json.RawMessage),
}
}

Expand Down Expand Up @@ -84,7 +85,8 @@ func (c *DefaultServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
// internal uint64 and save JSON on the side.
c.mutex.Lock()
c.seq++
c.pending[c.seq] = c.req.ID
// c.pending[c.seq] = c.req.ID
c.pending.Set(c.seq, c.req.ID)
c.req.ID = nil
r.Seq = c.seq
c.mutex.Unlock()
Expand Down Expand Up @@ -112,19 +114,21 @@ func (c *DefaultServerCodec) ReadRequestBody(x interface{}) error {
// WriteResponse implements rpc.ServerCodec
func (c *DefaultServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
c.mutex.Lock()
b, ok := c.pending[r.Seq]
// b, ok := c.pending[r.Seq]
b, ok := c.pending.Get(r.Seq)
if !ok {
c.mutex.Unlock()
return errors.New("invalid sequence number in response")
}
delete(c.pending, r.Seq)
// delete(c.pending, r.Seq)
c.pending.Del(r.Seq)
c.mutex.Unlock()

if b == nil {
// Notification. Do not respond.
return nil
}
resp := serverResponse{Version: "2.0", ID: b}
resp := serverResponse{Version: "2.0", ID: b.(*json.RawMessage)}
if r.Error == "" {
if x == nil {
resp.Result = &null
Expand Down
119 changes: 119 additions & 0 deletions server/utils/safemap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package utils

import "sync"

// https://github.com/zeromicro/go-zero/blob/master/core/collection/safemap.go
const (
copyThreshold = 1000
maxDeletion = 10000
)

// SafeMap provides a map alternative to avoid memory leak.
// This implementation is not needed until issue below fixed.
// https://github.com/golang/go/issues/20135
type SafeMap struct {
lock sync.RWMutex
deletionOld int
deletionNew int
dirtyOld map[uint64]interface{}
dirtyNew map[uint64]interface{}
}

// NewSafeMap returns a SafeMap.
func NewSafeMap() *SafeMap {
return &SafeMap{
dirtyOld: make(map[uint64]interface{}),
dirtyNew: make(map[uint64]interface{}),
}
}

// Del deletes the value with the given key from m.
func (m *SafeMap) Del(key uint64) {
m.lock.Lock()
if _, ok := m.dirtyOld[key]; ok {
m.dirtyOld[key] = nil
delete(m.dirtyOld, key)
m.deletionOld++
} else if _, ok := m.dirtyNew[key]; ok {
m.dirtyNew[key] = nil
delete(m.dirtyNew, key)
m.deletionNew++
}
if m.deletionOld >= maxDeletion && len(m.dirtyOld) < copyThreshold {
for k, v := range m.dirtyOld {
m.dirtyNew[k] = v
}
m.dirtyOld = m.dirtyNew
m.deletionOld = m.deletionNew
m.dirtyNew = make(map[uint64]interface{})
m.deletionNew = 0
}
if m.deletionNew >= maxDeletion && len(m.dirtyNew) < copyThreshold {
for k, v := range m.dirtyNew {
m.dirtyOld[k] = v
}
m.dirtyNew = make(map[uint64]interface{})
m.deletionNew = 0
}
m.lock.Unlock()
}

// Get gets the value with the given key from m.
func (m *SafeMap) Get(key uint64) (interface{}, bool) {
m.lock.RLock()
defer m.lock.RUnlock()

if val, ok := m.dirtyOld[key]; ok {
return val, true
}

val, ok := m.dirtyNew[key]
return val, ok
}

// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
func (m *SafeMap) Range(f func(key, val interface{}) bool) {
m.lock.RLock()
defer m.lock.RUnlock()

for k, v := range m.dirtyOld {
if !f(k, v) {
return
}
}
for k, v := range m.dirtyNew {
if !f(k, v) {
return
}
}
}

// Set sets the value into m with the given key.
func (m *SafeMap) Set(key uint64, value interface{}) {
m.lock.Lock()
if m.deletionOld <= maxDeletion {
if _, ok := m.dirtyNew[key]; ok {
m.dirtyNew[key] = nil
delete(m.dirtyNew, key)
m.deletionNew++
}
m.dirtyOld[key] = value
} else {
if _, ok := m.dirtyOld[key]; ok {
m.dirtyOld[key] = nil
delete(m.dirtyOld, key)
m.deletionOld++
}
m.dirtyNew[key] = value
}
m.lock.Unlock()
}

// Size returns the size of m.
func (m *SafeMap) Size() int {
m.lock.RLock()
size := len(m.dirtyOld) + len(m.dirtyNew)
m.lock.RUnlock()
return size
}
Loading

0 comments on commit c669b0e

Please sign in to comment.