diff --git a/harness/harness.go b/harness/harness.go index 2107101..2a4a38e 100644 --- a/harness/harness.go +++ b/harness/harness.go @@ -25,19 +25,18 @@ func main() { log.Println("Starting distributor....") dist := nias2.Distributor{} - go dist.Run(poolsize) + switch nias2.NiasConfig.MsgTransport { + case "MEM": + go dist.RunMemBus(poolsize) + case "NATS": + go dist.RunNATSBus(poolsize) + case "STAN": + go dist.RunSTANBus(poolsize) + default: + go dist.RunMemBus(poolsize) + } log.Println("...Distributor running") - log.Println("Starting storage agent...") - msg_store := &nias2.MessageStore{} - go msg_store.Run(poolsize) - log.Println("...Storage engine running") - - log.Println("Starting progress tracker...") - msg_trk := &nias2.MessageTracker{} - go msg_trk.Run(poolsize) - log.Println("...progress tracker running") - log.Println("Starting web services...") ws := &nias2.NIASWebServer{} go ws.Run() diff --git a/harness/nias.toml b/harness/nias.toml index 3eaa8e9..2f2badc 100755 --- a/harness/nias.toml +++ b/harness/nias.toml @@ -9,5 +9,9 @@ ValidationRoute = ["schema", "id", "dob", "asl"] # Webserver port WebServerPort = "1325" -# Number of validation engines -PoolSize = 4 \ No newline at end of file +# Performance Parameters +PoolSize = 10 +# "MEM | NATS" | "STAN" are alternatives depending on deployment model +MsgTransport = "MEM" + + diff --git a/lib/config.go b/lib/config.go index 38cfa96..5c37359 100755 --- a/lib/config.go +++ b/lib/config.go @@ -12,7 +12,8 @@ type NIASConfig struct { TestYear string WebServerPort string ValidationRoute []string - PoolSize int + PoolSize int // number of service processors + MsgTransport string } var NiasConfig = loadDefaultConfig() diff --git a/lib/distributor.go b/lib/distributor.go index c3bce87..641da74 100755 --- a/lib/distributor.go +++ b/lib/distributor.go @@ -7,96 +7,155 @@ package nias2 import ( - "github.com/nats-io/nats" + "github.com/nats-io/go-nats-streaming" "log" ) type Distributor struct{} -func createServiceRegister() *ServiceRegister { +// creates a pool of message handlers which process the +// routing slip of each message thru the listed services - log.Println("Creating services & register") - sr := NewServiceRegister() +// Use STAN as message bus +func (d *Distributor) RunSTANBus(poolsize int) { - schema1, err := NewCoreSchemaService() - if err != nil { - log.Fatal("Unable to create schema service ", err) - } + for i := 0; i < poolsize; i++ { - schema2, err := NewCustomSchemaService("local.json") - if err != nil { - log.Fatal("Unable to create schema service ", err) - } + sr := NewServiceRegister() + pc := NewSTANProcessChain() + ms := NewMessageStore() - id1, err := NewIDService() - if err != nil { - log.Fatal("Unable to create id service ", err) - } + // create storage handler + go func(pc STANProcessChain, ms *MessageStore, id int) { - dob1, err := NewDOBService(NiasConfig.TestYear) - if err != nil { - log.Fatal("Unable to create dob service ", err) - } + pc.store_in_conn.Subscribe(pc.store_in_subject, func(m *stan.Msg) { + ms.StoreMessage(DecodeNiasMessage(m.Data)) + }) - asl1, err := NewASLService() - if err != nil { - log.Fatal("Unable to create asl service ", err) - } + }(pc, ms, i) //drop ids + + // create service handler + go func(pc STANProcessChain, sr *ServiceRegister, ms *MessageStore, id int) { + + pc.srvc_in_conn.Subscribe(pc.srvc_in_subject, func(m *stan.Msg) { + msg := DecodeNiasMessage(m.Data) + responses := sr.ProcessByRoute(msg) + for _, response := range responses { + pc.srvc_out_conn.Publish(pc.store_in_subject, EncodeNiasMessage(&response)) + } + ms.IncrementTracker(msg.TxID) + }) + + }(pc, sr, ms, i) - sr.AddService("schema", schema1) - sr.AddService("local", schema2) - sr.AddService("id", id1) - sr.AddService("dob", dob1) - sr.AddService("asl", asl1) + // create an inbound handler to multiplex validation requests, create last or will drop messages + go func(pc STANProcessChain, id int) { - log.Println("services created & installed in register") + pc.dist_in_conn.QueueSubscribe(pc.dist_in_subject, "distributor", func(m *stan.Msg) { + pc.dist_out_conn.Publish(pc.dist_out_subject, m.Data) + }) - return sr + }(pc, i) + } } -func serviceHandler(m *NiasMessage, sr *ServiceRegister, dist_ec *nats.EncodedConn) { - - route := m.Route - - for _, sname := range route { - - // retrieve service from registry & execute - srvc := sr.FindService(sname) - responses, err := srvc.HandleMessage(m) - if err != nil { - log.Println("\t *** got an error on service handler " + sname + " ***") - log.Println("\t", err) - } else { - // pass the responses to the message store - for _, r := range responses { - response := r - response.Source = sname - err := dist_ec.Publish(STORE_TOPIC, response) - if err != nil { - log.Println("Error saving response to message store: ", err) +// Use regular NATS as message bus +func (d *Distributor) RunNATSBus(poolsize int) { + + for i := 0; i < poolsize; i++ { + + i := i + + sr := NewServiceRegister() + pc := NewNATSProcessChain() + ms := NewMessageStore() + + log.Printf("Process Chain %d\n%#v", i, pc) + + // create storage handler + go func(pc NATSProcessChain, ms *MessageStore) { + + pc.store_in_conn.Subscribe(pc.store_in_subject, func(m *NiasMessage) { + ms.StoreMessage(m) + }) + + }(pc, ms) + + // create service handler + go func(pc NATSProcessChain, sr *ServiceRegister, ms *MessageStore) { + + pc.srvc_in_conn.Subscribe(pc.srvc_in_subject, func(m *NiasMessage) { + responses := sr.ProcessByRoute(m) + for _, response := range responses { + pc.srvc_out_conn.Publish(pc.store_in_subject, response) } - } - } - } - // update the progress tracker - err := dist_ec.Publish(TRACK_TOPIC, m.TxID) - if err != nil { - log.Println("Error saving tracking data: ", err) - } + ms.IncrementTracker(m.TxID) + }) + + }(pc, sr, ms) + + // create an inbound handler to multiplex validation requests, create last or will drop messages + go func(pc NATSProcessChain) { + + pc.dist_in_conn.QueueSubscribe(pc.dist_in_subject, "distributor", func(m *NiasMessage) { + pc.dist_out_conn.Publish(pc.dist_out_subject, m) + }) + + }(pc) + } } -// creates a pool of message handlers which process the -// routing slip of each message thru the listed services -func (d *Distributor) Run(poolsize int) { +// use internal channels as message bus +func (d *Distributor) RunMemBus(poolsize int) { for i := 0; i < poolsize; i++ { - sr := createServiceRegister() - dist_ec := CreateNATSConnection() - dist_ec.QueueSubscribe(REQUEST_TOPIC, "distributor", func(m *NiasMessage) { - serviceHandler(m, sr, dist_ec) - }) + i := i + + sr := NewServiceRegister() + pc := NewMemProcessChain() + ms := NewMessageStore() + + log.Printf("Process Chain %d\n%#v", i, pc) + + // create storage handler + go func(pc MemProcessChain, ms *MessageStore) { + + for { + msg := <-pc.store_chan + ms.StoreMessage(msg) + } + + }(pc, ms) + + // create service handler + go func(pc MemProcessChain, sr *ServiceRegister, ms *MessageStore) { + + for { + msg := <-pc.req_chan + responses := sr.ProcessByRoute(msg) + for _, response := range responses { + pc.store_chan <- &response + } + ms.IncrementTracker(msg.TxID) + } + + }(pc, sr, ms) + } + } + +// +// +// +// +// +// +// +// +// +// +// diff --git a/lib/ledis.go b/lib/ledis.go index 817b4da..8053695 100755 --- a/lib/ledis.go +++ b/lib/ledis.go @@ -14,8 +14,9 @@ const DEF_ADDRESS = "127.0.0.1:6380" func CreateLedisConnection() *goredis.Client { - log.Println("Creating ledis client") c := goredis.NewClient(DEF_ADDRESS, "") + // connection pool size within the client + c.SetMaxIdleConns(12) return c } @@ -26,11 +27,11 @@ func LaunchLedisServer() { cfg.LevelDB.CacheSize = 524288000 cfg.LevelDB.WriteBufferSize = 67108864 - // cfg.LevelDB.MaxOpenFiles = 10240 + cfg.LevelDB.MaxOpenFiles = 10240 // log.Println("\tLDB MxOF is: ", cfg.LevelDB.MaxOpenFiles) - cfg.ConnReadBufferSize = 1024 - cfg.ConnWriteBufferSize = 1024 + cfg.ConnReadBufferSize = (1024 * 1024 * 5) + cfg.ConnWriteBufferSize = (10240 * 1024 * 5) // log.Println("\tRead buffer is", cfg.ConnReadBufferSize) // log.Println("\tWrite buffer is", cfg.ConnWriteBufferSize) diff --git a/lib/nats.go b/lib/nats.go index 987d1df..a08987a 100755 --- a/lib/nats.go +++ b/lib/nats.go @@ -3,7 +3,9 @@ package nias2 import ( + "github.com/nats-io/go-nats-streaming" "github.com/nats-io/nats" + "github.com/nats-io/nuid" "log" ) @@ -13,7 +15,112 @@ import ( const REQUEST_TOPIC = "requests" const STORE_TOPIC = "store" const TRACK_TOPIC = "track" +const NIAS_CLUSTER_ID = "nias" +var req_chan = make(chan *NiasMessage, 1) + +/* +ProcessChains provide conceptual links between the major +components of nias: + +inbound (distributor/dist) connections + represetnitng entry points for data such as a web gateway or file reader + +service (srvc) connections + multiplexing inbound data to service handlers + +storage (store) connections + feeding processed data to storage; streams or database + +*/ + +// ProcessChain when running with NATS Streaming Server, known as STAN +type STANProcessChain struct { + dist_in_conn stan.Conn + dist_in_subject string + dist_out_conn stan.Conn + dist_out_subject string + + srvc_in_conn stan.Conn + srvc_in_subject string + srvc_out_conn stan.Conn + srvc_out_subject string + + store_in_conn stan.Conn + store_in_subject string +} + +// ProcessChain for running with standard NATS +type NATSProcessChain struct { + dist_in_conn *nats.EncodedConn + dist_in_subject string + dist_out_conn *nats.EncodedConn + dist_out_subject string + + srvc_in_conn *nats.EncodedConn + srvc_in_subject string + srvc_out_conn *nats.EncodedConn + srvc_out_subject string + + store_in_conn *nats.EncodedConn + store_in_subject string +} + +// ProcessChain for running in memory - useful for standaloe use +// and for resource constrained enviornments, use of blocking channels accross +// the chain means solution will balance processing accross the chain based on +// speed of host +type MemProcessChain struct { + req_chan chan *NiasMessage + srvc_chan chan *NiasMessage + store_chan chan *NiasMessage +} + +func NewMemProcessChain() MemProcessChain { + mpc, _ := createMemProcessChain() + return mpc +} + +func createMemProcessChain() (MemProcessChain, error) { + + pc := MemProcessChain{} + + pc.req_chan = req_chan + pc.srvc_chan = make(chan *NiasMessage, 1) + pc.store_chan = make(chan *NiasMessage, 1) + + return pc, nil + +} + +func NewNATSProcessChain() NATSProcessChain { + npc, _ := createNATSProcessChain() + return npc +} + +func createNATSProcessChain() (NATSProcessChain, error) { + + pc := NATSProcessChain{} + + distID := nuid.Next() + pc.dist_in_conn = CreateNATSConnection() + pc.dist_out_conn = CreateNATSConnection() + pc.dist_in_subject = REQUEST_TOPIC + pc.dist_out_subject = distID + + srvcID := nuid.Next() + pc.srvc_in_conn = CreateNATSConnection() + pc.srvc_out_conn = CreateNATSConnection() + pc.srvc_in_subject = distID + pc.srvc_out_subject = srvcID + + pc.store_in_conn = CreateNATSConnection() + pc.store_in_subject = srvcID + + return pc, nil +} + +// helper function to provide encoded connections for standard NATA func CreateNATSConnection() *nats.EncodedConn { nc, err := nats.Connect(nats.DefaultURL) ec, err := nats.NewEncodedConn(nc, nats.GOB_ENCODER) @@ -23,3 +130,30 @@ func CreateNATSConnection() *nats.EncodedConn { return ec } + +func NewSTANProcessChain() STANProcessChain { + spc, _ := createSTANProcessChain() + return spc +} + +func createSTANProcessChain() (STANProcessChain, error) { + + pc := STANProcessChain{} + + distID := nuid.Next() + pc.dist_in_conn, _ = stan.Connect(NIAS_CLUSTER_ID, nuid.Next()) + pc.dist_out_conn, _ = stan.Connect(NIAS_CLUSTER_ID, nuid.Next()) + pc.dist_in_subject = REQUEST_TOPIC + pc.dist_out_subject = distID + + srvcID := nuid.Next() + pc.srvc_in_conn, _ = stan.Connect(NIAS_CLUSTER_ID, nuid.Next()) + pc.srvc_out_conn, _ = stan.Connect(NIAS_CLUSTER_ID, nuid.Next()) + pc.srvc_in_subject = distID + pc.srvc_out_subject = srvcID + + pc.store_in_conn, _ = stan.Connect(NIAS_CLUSTER_ID, nuid.Next()) + pc.store_in_subject = srvcID + + return pc, nil +} diff --git a/lib/serviceregister.go b/lib/serviceregister.go index 61428d8..b977e77 100755 --- a/lib/serviceregister.go +++ b/lib/serviceregister.go @@ -5,6 +5,7 @@ package nias2 // will be matched against required tasks in NasMessage.Route meta-data import ( + "log" "sync" ) @@ -18,9 +19,7 @@ type ServiceRegister struct { // creates a ServiceRegister with properly initilaised internal map // processing services are stored with a name and the referenced NiasService func NewServiceRegister() *ServiceRegister { - reg := ServiceRegister{} - reg.registry = make(map[string]NiasService) - return ® + return createDefaultServiceRegister() } // add a service to the registry with a name @@ -43,3 +42,75 @@ func (sr *ServiceRegister) FindService(servicename string) NiasService { defer sr.RUnlock() return sr.registry[servicename] } + +// build register with default set of services +func createDefaultServiceRegister() *ServiceRegister { + + log.Println("Creating services & register") + sr := ServiceRegister{} + sr.registry = make(map[string]NiasService) + + schema1, err := NewCoreSchemaService() + if err != nil { + log.Fatal("Unable to create schema service ", err) + } + + schema2, err := NewCustomSchemaService("local.json") + if err != nil { + log.Fatal("Unable to create schema service ", err) + } + + id1, err := NewIDService() + if err != nil { + log.Fatal("Unable to create id service ", err) + } + + dob1, err := NewDOBService(NiasConfig.TestYear) + if err != nil { + log.Fatal("Unable to create dob service ", err) + } + + asl1, err := NewASLService() + if err != nil { + log.Fatal("Unable to create asl service ", err) + } + + sr.AddService("schema", schema1) + sr.AddService("local", schema2) + sr.AddService("id", id1) + sr.AddService("dob", dob1) + sr.AddService("asl", asl1) + + log.Println("services created & installed in register") + + return &sr + +} + +func (sr *ServiceRegister) ProcessByRoute(m *NiasMessage) []NiasMessage { + + response_msgs := make([]NiasMessage, 0) + + route := m.Route + + for _, sname := range route { + + // retrieve service from registry & execute + srvc := sr.FindService(sname) + responses, err := srvc.HandleMessage(m) + if err != nil { + log.Println("\t *** got an error on service handler " + sname + " ***") + log.Println("\t", err) + } else { + // pass the responses to the message store + for _, r := range responses { + response := r + response.Source = sname + response_msgs = append(response_msgs, response) + } + } + } + + return response_msgs + +} diff --git a/lib/store.go b/lib/store.go index bb3f8c5..cc38048 100755 --- a/lib/store.go +++ b/lib/store.go @@ -15,11 +15,17 @@ const VALIDATION_PREFIX = "nvr:" // MessageStore listens for messages on the store topic and captures them // in ledis as lists (persistent qs in effect), messages can be stored on transaction // and use case basis - use case being the superset of all transactions of a given type -type MessageStore struct{} +type MessageStore struct { + C *goredis.Client + trkC *goredis.Client +} -// MessageTracker listens for progress updates from services and -// updates progress monitor in ledis -type MessageTracker struct{} +func NewMessageStore() *MessageStore { + ms := MessageStore{ + C: CreateLedisConnection(), + } + return &ms +} // track status in simple hash counter; key is transactionid, value is no. records processed var status = make(map[string]int) @@ -27,54 +33,35 @@ var status = make(map[string]int) // mutex to protect status hash for concurrent updates var mutex = &sync.Mutex{} -// launch a set of storage agents that will listen for messages to store -func (ms *MessageStore) Run(poolsize int) { - - ms_ec := CreateNATSConnection() - for i := 0; i < poolsize; i++ { - c := CreateLedisConnection() - // defer c.Close() - ms_ec.QueueSubscribe(STORE_TOPIC, "msg_store", func(m *NiasMessage) { - // don't store if no content - if m.Body != nil { - - // store for txaction - tx_key := m.Target + m.TxID - _, err := c.Do("rpush", tx_key, EncodeNiasMessage(m)) - if err != nil { - log.Println("error saving message:tx: - ", err) - } - - // store for use case - disabled for now - store_usecase := false - if store_usecase { - uc_key := m.Target - _, err := c.Do("rpush", uc_key, EncodeNiasMessage(m)) - if err != nil { - log.Println("error saving message:uc - ", err) - } - } - - } - - }) - } -} +// put message into ledis store +// endcode converts nias message to byte array for storage +func (ms *MessageStore) StoreMessage(m *NiasMessage) { -// create a set of progress trackers to monitor services' progress -func (mt *MessageTracker) Run(poolsize int) { + // store for txaction + tx_key := m.Target + m.TxID + _, err := ms.C.Do("rpush", tx_key, EncodeNiasMessage(m)) + if err != nil { + log.Println("error saving message:tx: - ", err) + } - mt_ec := CreateNATSConnection() - for i := 0; i < poolsize; i++ { + // store for use case - disabled for now - in config + store_usecase := false + if store_usecase { + uc_key := m.Target + _, err := ms.C.Do("rpush", uc_key, EncodeNiasMessage(m)) + if err != nil { + log.Println("error saving message:uc - ", err) + } + } - mt_ec.QueueSubscribe(TRACK_TOPIC, "msg_tracker", func(txid string) { +} - mutex.Lock() - status[txid]++ - mutex.Unlock() +// update the progress of the validation transaction +func (ms *MessageStore) IncrementTracker(txid string) { - }) - } + mutex.Lock() + status[txid]++ + mutex.Unlock() } @@ -123,8 +110,7 @@ func GetTrackingData(txid string) map[string]string { return trackmap } -// binary encding for messages going to internal q/store, in nats qs this is -// handled automatically by the use of gob encoder on connection +// binary encding for messages going to internal q/store. func EncodeNiasMessage(msg *NiasMessage) []byte { encBuf := new(bytes.Buffer) @@ -137,8 +123,7 @@ func EncodeNiasMessage(msg *NiasMessage) []byte { } -// binary encding for messages coming from internal q/store, in nats qs this is -// handled automatically by the use of gob encoder on connection +// binary decoding for messages coming from internal q/store. func DecodeNiasMessage(bytemsg []uint8) *NiasMessage { decBuf := bytes.NewBuffer(bytemsg) diff --git a/lib/webserver.go b/lib/webserver.go index 6bd1dc6..5938c90 100755 --- a/lib/webserver.go +++ b/lib/webserver.go @@ -9,25 +9,27 @@ import ( "encoding/xml" "github.com/labstack/echo" //"github.com/labstack/echo/engine/fasthttp" - "html/template" - "log" - "path" - "strconv" - "strings" - "time" "github.com/labstack/echo/engine/standard" mw "github.com/labstack/echo/middleware" ms "github.com/mitchellh/mapstructure" + "github.com/nats-io/go-nats-streaming" "github.com/nats-io/nats" "github.com/nats-io/nuid" "github.com/twinj/uuid" "github.com/wildducktheories/go-csv" + "html/template" + "log" "mime/multipart" "net/http" + "path" + "strconv" + "strings" + "time" ) var VALIDATION_ROUTE = NiasConfig.ValidationRoute -var web_ec *nats.EncodedConn +var req_ec *nats.EncodedConn +var req_conn stan.Conn // rendering template for csv-xml conversion var sptmpl *template.Template @@ -46,15 +48,31 @@ type IngestResponse struct { // dataset func removeBlanks(m map[string]string) map[string]string { - reducedmap := make(map[string]string) - for key, val := range m { - if val != "" { - reducedmap[key] = val - } - } - return reducedmap + reducedmap := make(map[string]string) + for key, val := range m { + if val != "" { + reducedmap[key] = val + } + } + return reducedmap } +// generic publish routine that handles different requirements +// of the 3 possible message infrastrucutres +func publish(msg *NiasMessage) { + + switch NiasConfig.MsgTransport { + case "MEM": + req_chan <- msg + case "NATS": + req_ec.Publish(REQUEST_TOPIC, msg) + case "STAN": + req_conn.Publish(REQUEST_TOPIC, EncodeNiasMessage(msg)) + default: + req_chan <- msg + } + +} // read csv file as stream an post records onto processing queue func enqueueCSV(file multipart.File) (IngestResponse, error) { @@ -72,13 +90,10 @@ func enqueueCSV(file multipart.File) (IngestResponse, error) { regr := RegistrationRecord{} r := removeBlanks(record.AsMap()) - // log.Printf("record is:\n%v\n", r) - //decode_err := ms.Decode(record.AsMap(), ®r) decode_err := ms.Decode(r, ®r) if decode_err != nil { return ir, decode_err } - // log.Printf("regr is:\n%v\n", regr) msg := &NiasMessage{} msg.Body = regr @@ -88,7 +103,8 @@ func enqueueCSV(file multipart.File) (IngestResponse, error) { msg.Target = VALIDATION_PREFIX msg.Route = VALIDATION_ROUTE - web_ec.Publish(REQUEST_TOPIC, msg) + publish(msg) + } ir.Records = i @@ -133,7 +149,10 @@ func enqueueXML(file multipart.File) (IngestResponse, error) { msg.Target = VALIDATION_PREFIX msg.Route = VALIDATION_ROUTE - web_ec.Publish(REQUEST_TOPIC, msg) + // xml_ec.Publish(REQUEST_TOPIC, msg) + // xml_conn.Publish(REQUEST_TOPIC, EncodeNiasMessage(msg)) + // req_chan <- msg + publish(msg) } default: @@ -151,8 +170,13 @@ func enqueueXML(file multipart.File) (IngestResponse, error) { // start the server func (nws *NIASWebServer) Run() { - log.Println("Connecting to NATS server") - web_ec = CreateNATSConnection() + log.Println("Connecting to message bus") + switch NiasConfig.MsgTransport { + case "NATS": + req_ec = CreateNATSConnection() + case "STAN": + req_conn, _ = stan.Connect(NIAS_CLUSTER_ID, nuid.Next()) + } log.Println("Initialising uuid generator") config := uuid.StateSaverConfig{SaveReport: true, SaveSchedule: 30 * time.Minute}