Skip to content

Commit

Permalink
Merge pull request #12 from nsip/self-balancing-multi-transport
Browse files Browse the repository at this point in the history
stable performance for non-unix systems
  • Loading branch information
matt-farmer authored Jul 15, 2016
2 parents 795a04a + 0df1b57 commit 8c0b439
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 160 deletions.
21 changes: 10 additions & 11 deletions harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,18 @@ func main() {

log.Println("Starting distributor....")
dist := nias2.Distributor{}
go dist.Run(poolsize)
switch nias2.NiasConfig.MsgTransport {
case "MEM":
go dist.RunMemBus(poolsize)
case "NATS":
go dist.RunNATSBus(poolsize)
case "STAN":
go dist.RunSTANBus(poolsize)
default:
go dist.RunMemBus(poolsize)
}
log.Println("...Distributor running")

log.Println("Starting storage agent...")
msg_store := &nias2.MessageStore{}
go msg_store.Run(poolsize)
log.Println("...Storage engine running")

log.Println("Starting progress tracker...")
msg_trk := &nias2.MessageTracker{}
go msg_trk.Run(poolsize)
log.Println("...progress tracker running")

log.Println("Starting web services...")
ws := &nias2.NIASWebServer{}
go ws.Run()
Expand Down
8 changes: 6 additions & 2 deletions harness/nias.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ ValidationRoute = ["schema", "id", "dob", "asl"]
# Webserver port
WebServerPort = "1325"

# Number of validation engines
PoolSize = 4
# Performance Parameters
PoolSize = 10
# "MEM | NATS" | "STAN" are alternatives depending on deployment model
MsgTransport = "MEM"


3 changes: 2 additions & 1 deletion lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ type NIASConfig struct {
TestYear string
WebServerPort string
ValidationRoute []string
PoolSize int
PoolSize int // number of service processors
MsgTransport string
}

var NiasConfig = loadDefaultConfig()
Expand Down
193 changes: 126 additions & 67 deletions lib/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,96 +7,155 @@
package nias2

import (
"github.com/nats-io/nats"
"github.com/nats-io/go-nats-streaming"
"log"
)

type Distributor struct{}

func createServiceRegister() *ServiceRegister {
// creates a pool of message handlers which process the
// routing slip of each message thru the listed services

log.Println("Creating services & register")
sr := NewServiceRegister()
// Use STAN as message bus
func (d *Distributor) RunSTANBus(poolsize int) {

schema1, err := NewCoreSchemaService()
if err != nil {
log.Fatal("Unable to create schema service ", err)
}
for i := 0; i < poolsize; i++ {

schema2, err := NewCustomSchemaService("local.json")
if err != nil {
log.Fatal("Unable to create schema service ", err)
}
sr := NewServiceRegister()
pc := NewSTANProcessChain()
ms := NewMessageStore()

id1, err := NewIDService()
if err != nil {
log.Fatal("Unable to create id service ", err)
}
// create storage handler
go func(pc STANProcessChain, ms *MessageStore, id int) {

dob1, err := NewDOBService(NiasConfig.TestYear)
if err != nil {
log.Fatal("Unable to create dob service ", err)
}
pc.store_in_conn.Subscribe(pc.store_in_subject, func(m *stan.Msg) {
ms.StoreMessage(DecodeNiasMessage(m.Data))
})

asl1, err := NewASLService()
if err != nil {
log.Fatal("Unable to create asl service ", err)
}
}(pc, ms, i) //drop ids

// create service handler
go func(pc STANProcessChain, sr *ServiceRegister, ms *MessageStore, id int) {

pc.srvc_in_conn.Subscribe(pc.srvc_in_subject, func(m *stan.Msg) {
msg := DecodeNiasMessage(m.Data)
responses := sr.ProcessByRoute(msg)
for _, response := range responses {
pc.srvc_out_conn.Publish(pc.store_in_subject, EncodeNiasMessage(&response))
}
ms.IncrementTracker(msg.TxID)
})

}(pc, sr, ms, i)

sr.AddService("schema", schema1)
sr.AddService("local", schema2)
sr.AddService("id", id1)
sr.AddService("dob", dob1)
sr.AddService("asl", asl1)
// create an inbound handler to multiplex validation requests, create last or will drop messages
go func(pc STANProcessChain, id int) {

log.Println("services created & installed in register")
pc.dist_in_conn.QueueSubscribe(pc.dist_in_subject, "distributor", func(m *stan.Msg) {
pc.dist_out_conn.Publish(pc.dist_out_subject, m.Data)
})

return sr
}(pc, i)

}
}

func serviceHandler(m *NiasMessage, sr *ServiceRegister, dist_ec *nats.EncodedConn) {

route := m.Route

for _, sname := range route {

// retrieve service from registry & execute
srvc := sr.FindService(sname)
responses, err := srvc.HandleMessage(m)
if err != nil {
log.Println("\t *** got an error on service handler " + sname + " ***")
log.Println("\t", err)
} else {
// pass the responses to the message store
for _, r := range responses {
response := r
response.Source = sname
err := dist_ec.Publish(STORE_TOPIC, response)
if err != nil {
log.Println("Error saving response to message store: ", err)
// Use regular NATS as message bus
func (d *Distributor) RunNATSBus(poolsize int) {

for i := 0; i < poolsize; i++ {

i := i

sr := NewServiceRegister()
pc := NewNATSProcessChain()
ms := NewMessageStore()

log.Printf("Process Chain %d\n%#v", i, pc)

// create storage handler
go func(pc NATSProcessChain, ms *MessageStore) {

pc.store_in_conn.Subscribe(pc.store_in_subject, func(m *NiasMessage) {
ms.StoreMessage(m)
})

}(pc, ms)

// create service handler
go func(pc NATSProcessChain, sr *ServiceRegister, ms *MessageStore) {

pc.srvc_in_conn.Subscribe(pc.srvc_in_subject, func(m *NiasMessage) {
responses := sr.ProcessByRoute(m)
for _, response := range responses {
pc.srvc_out_conn.Publish(pc.store_in_subject, response)
}
}
}
}
// update the progress tracker
err := dist_ec.Publish(TRACK_TOPIC, m.TxID)
if err != nil {
log.Println("Error saving tracking data: ", err)
}
ms.IncrementTracker(m.TxID)
})

}(pc, sr, ms)

// create an inbound handler to multiplex validation requests, create last or will drop messages
go func(pc NATSProcessChain) {

pc.dist_in_conn.QueueSubscribe(pc.dist_in_subject, "distributor", func(m *NiasMessage) {
pc.dist_out_conn.Publish(pc.dist_out_subject, m)
})

}(pc)

}
}

// creates a pool of message handlers which process the
// routing slip of each message thru the listed services
func (d *Distributor) Run(poolsize int) {
// use internal channels as message bus
func (d *Distributor) RunMemBus(poolsize int) {

for i := 0; i < poolsize; i++ {

sr := createServiceRegister()
dist_ec := CreateNATSConnection()
dist_ec.QueueSubscribe(REQUEST_TOPIC, "distributor", func(m *NiasMessage) {
serviceHandler(m, sr, dist_ec)
})
i := i

sr := NewServiceRegister()
pc := NewMemProcessChain()
ms := NewMessageStore()

log.Printf("Process Chain %d\n%#v", i, pc)

// create storage handler
go func(pc MemProcessChain, ms *MessageStore) {

for {
msg := <-pc.store_chan
ms.StoreMessage(msg)
}

}(pc, ms)

// create service handler
go func(pc MemProcessChain, sr *ServiceRegister, ms *MessageStore) {

for {
msg := <-pc.req_chan
responses := sr.ProcessByRoute(msg)
for _, response := range responses {
pc.store_chan <- &response
}
ms.IncrementTracker(msg.TxID)
}

}(pc, sr, ms)

}

}

//
//
//
//
//
//
//
//
//
//
//
9 changes: 5 additions & 4 deletions lib/ledis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ const DEF_ADDRESS = "127.0.0.1:6380"

func CreateLedisConnection() *goredis.Client {

log.Println("Creating ledis client")
c := goredis.NewClient(DEF_ADDRESS, "")
// connection pool size within the client
c.SetMaxIdleConns(12)
return c

}
Expand All @@ -26,11 +27,11 @@ func LaunchLedisServer() {
cfg.LevelDB.CacheSize = 524288000
cfg.LevelDB.WriteBufferSize = 67108864

// cfg.LevelDB.MaxOpenFiles = 10240
cfg.LevelDB.MaxOpenFiles = 10240
// log.Println("\tLDB MxOF is: ", cfg.LevelDB.MaxOpenFiles)

cfg.ConnReadBufferSize = 1024
cfg.ConnWriteBufferSize = 1024
cfg.ConnReadBufferSize = (1024 * 1024 * 5)
cfg.ConnWriteBufferSize = (10240 * 1024 * 5)

// log.Println("\tRead buffer is", cfg.ConnReadBufferSize)
// log.Println("\tWrite buffer is", cfg.ConnWriteBufferSize)
Expand Down
Loading

0 comments on commit 8c0b439

Please sign in to comment.