From 8544f7a8ffd61fa3f535cc676194e2f7444b5151 Mon Sep 17 00:00:00 2001 From: matt-farmer Date: Wed, 17 Aug 2016 11:42:56 +1000 Subject: [PATCH] Me bus slice iterator error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Unpredictable behaviour (‘duplicated’ output messages, correct number but overwriting existing messages) when no explicit assignment to iterated item. Fixed. Added comments to main toml file. --- harness/nias.toml | 37 ++++++++++++++++++++++++++++++++++--- lib/distributor.go | 20 ++++++++------------ lib/serviceregister.go | 4 ++++ lib/webserver.go | 9 +++++---- 4 files changed, 51 insertions(+), 19 deletions(-) diff --git a/harness/nias.toml b/harness/nias.toml index 649b898..7e201dc 100755 --- a/harness/nias.toml +++ b/harness/nias.toml @@ -1,3 +1,7 @@ +# ============================================================================ +# Basic Config +# +# ============================================================================ # Baseline year for DOB checks TestYear = "2017" @@ -9,9 +13,36 @@ ValidationRoute = ["schema", "id", "dob", "asl", "psi"] # Webserver port WebServerPort = "1325" -# Performance Parameters + + +# ============================================================================ +# Advanced parameters +# +# ============================================================================ + +# ============================================================================ +# Poolsize; number of worker processes to run concurrently PoolSize = 10 -# "MEM | NATS" | "STAN" are alternatives depending on deployment model -MsgTransport = "MEM" +# ============================================================================ +# ============================================================================ +# Memory Bus model +# "MEM | NATS" | "STAN" are alternatives depending on deployment type +# +# Use "MEM" for standalone low capacity machines and most Windows machines +# which have caps on tcp traffic +# +# Use NATS on capable machines and servers, and is preferred bus for Mac & Linux +# is also used as default bus for clustered installations +# +# Use STAN on same machines as NATS, but when persistent message streams are +# required - i.e. clients may connect after work has completed +# +# Default build from NSIP is issued with "MEM" selected as majority of standalone +# users are on Windows. If on Mac/Linux change to NATS for better performance. +# +# STAN streaming server is issued as sperate tool, please contact NSIP if required. +# Remember to set -cluster_id of streaming server to 'nias' if used. +MsgTransport = "MEM" +# ============================================================================ diff --git a/lib/distributor.go b/lib/distributor.go index 641da74..0c4ae94 100755 --- a/lib/distributor.go +++ b/lib/distributor.go @@ -8,7 +8,7 @@ package nias2 import ( "github.com/nats-io/go-nats-streaming" - "log" + // "log" ) type Distributor struct{} @@ -41,7 +41,8 @@ func (d *Distributor) RunSTANBus(poolsize int) { msg := DecodeNiasMessage(m.Data) responses := sr.ProcessByRoute(msg) for _, response := range responses { - pc.srvc_out_conn.Publish(pc.store_in_subject, EncodeNiasMessage(&response)) + r := response + pc.srvc_out_conn.Publish(pc.store_in_subject, EncodeNiasMessage(&r)) } ms.IncrementTracker(msg.TxID) }) @@ -65,14 +66,10 @@ func (d *Distributor) RunNATSBus(poolsize int) { for i := 0; i < poolsize; i++ { - i := i - sr := NewServiceRegister() pc := NewNATSProcessChain() ms := NewMessageStore() - log.Printf("Process Chain %d\n%#v", i, pc) - // create storage handler go func(pc NATSProcessChain, ms *MessageStore) { @@ -88,7 +85,8 @@ func (d *Distributor) RunNATSBus(poolsize int) { pc.srvc_in_conn.Subscribe(pc.srvc_in_subject, func(m *NiasMessage) { responses := sr.ProcessByRoute(m) for _, response := range responses { - pc.srvc_out_conn.Publish(pc.store_in_subject, response) + r := response + pc.srvc_out_conn.Publish(pc.store_in_subject, r) } ms.IncrementTracker(m.TxID) }) @@ -112,14 +110,10 @@ func (d *Distributor) RunMemBus(poolsize int) { for i := 0; i < poolsize; i++ { - i := i - sr := NewServiceRegister() pc := NewMemProcessChain() ms := NewMessageStore() - log.Printf("Process Chain %d\n%#v", i, pc) - // create storage handler go func(pc MemProcessChain, ms *MessageStore) { @@ -135,9 +129,11 @@ func (d *Distributor) RunMemBus(poolsize int) { for { msg := <-pc.req_chan + // log.Printf("\t\tservice handler recieved msg: %+v", msg) responses := sr.ProcessByRoute(msg) for _, response := range responses { - pc.store_chan <- &response + r := response + pc.store_chan <- &r } ms.IncrementTracker(msg.TxID) } diff --git a/lib/serviceregister.go b/lib/serviceregister.go index 198930f..581913b 100644 --- a/lib/serviceregister.go +++ b/lib/serviceregister.go @@ -99,6 +99,8 @@ func (sr *ServiceRegister) ProcessByRoute(m *NiasMessage) []NiasMessage { route := m.Route + // log.Printf("\t\tservice register recieved msg: %+v", m) + for _, sname := range route { // retrieve service from registry & execute @@ -109,6 +111,7 @@ func (sr *ServiceRegister) ProcessByRoute(m *NiasMessage) []NiasMessage { log.Println("\t", err) } else { // pass the responses to the message store + // log.Printf("\t\tservice %s returned %d responses: %+v", sname, len(responses), responses) for _, r := range responses { response := r response.Source = sname @@ -117,6 +120,7 @@ func (sr *ServiceRegister) ProcessByRoute(m *NiasMessage) []NiasMessage { } } + // log.Printf("\t\tresponse messages: %+v", response_msgs) return response_msgs } diff --git a/lib/webserver.go b/lib/webserver.go index 3b5cbf6..3dff8d1 100644 --- a/lib/webserver.go +++ b/lib/webserver.go @@ -149,9 +149,6 @@ func enqueueXML(file multipart.File) (IngestResponse, error) { msg.Target = VALIDATION_PREFIX msg.Route = VALIDATION_ROUTE - // xml_ec.Publish(REQUEST_TOPIC, msg) - // xml_conn.Publish(REQUEST_TOPIC, EncodeNiasMessage(msg)) - // req_chan <- msg publish(msg) } @@ -175,7 +172,11 @@ func (nws *NIASWebServer) Run() { case "NATS": req_ec = CreateNATSConnection() case "STAN": - req_conn, _ = stan.Connect(NIAS_CLUSTER_ID, nuid.Next()) + var stan_err error + req_conn, stan_err = stan.Connect(NIAS_CLUSTER_ID, nuid.Next()) + if stan_err != nil { + log.Fatalf("Unable to connect to STAN server with cluster id: %s\nError:%s\nService aborting...", NIAS_CLUSTER_ID, stan_err) + } } log.Println("Initialising uuid generator")