Skip to content

Commit

Permalink
Merge pull request #19 from nsip/mem-bus-checking
Browse files Browse the repository at this point in the history
Mem bus slice iterator error
  • Loading branch information
matt-farmer authored Aug 17, 2016
2 parents 7fb7d34 + 8544f7a commit 2752c8f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 19 deletions.
37 changes: 34 additions & 3 deletions harness/nias.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# ============================================================================
# Basic Config
#
# ============================================================================

# Baseline year for DOB checks
TestYear = "2017"
Expand All @@ -9,9 +13,36 @@ ValidationRoute = ["schema", "id", "dob", "asl", "psi"]
# Webserver port
WebServerPort = "1325"

# Performance Parameters


# ============================================================================
# Advanced parameters
#
# ============================================================================

# ============================================================================
# Poolsize; number of worker processes to run concurrently
PoolSize = 10
# "MEM | NATS" | "STAN" are alternatives depending on deployment model
MsgTransport = "MEM"
# ============================================================================

# ============================================================================
# Memory Bus model
# "MEM | NATS" | "STAN" are alternatives depending on deployment type
#
# Use "MEM" for standalone low capacity machines and most Windows machines
# which have caps on tcp traffic
#
# Use NATS on capable machines and servers, and is preferred bus for Mac & Linux
# is also used as default bus for clustered installations
#
# Use STAN on same machines as NATS, but when persistent message streams are
# required - i.e. clients may connect after work has completed
#
# Default build from NSIP is issued with "MEM" selected as majority of standalone
# users are on Windows. If on Mac/Linux change to NATS for better performance.
#
# STAN streaming server is issued as sperate tool, please contact NSIP if required.
# Remember to set -cluster_id of streaming server to 'nias' if used.

MsgTransport = "MEM"
# ============================================================================
20 changes: 8 additions & 12 deletions lib/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package nias2

import (
"github.com/nats-io/go-nats-streaming"
"log"
// "log"
)

type Distributor struct{}
Expand Down Expand Up @@ -41,7 +41,8 @@ func (d *Distributor) RunSTANBus(poolsize int) {
msg := DecodeNiasMessage(m.Data)
responses := sr.ProcessByRoute(msg)
for _, response := range responses {
pc.srvc_out_conn.Publish(pc.store_in_subject, EncodeNiasMessage(&response))
r := response
pc.srvc_out_conn.Publish(pc.store_in_subject, EncodeNiasMessage(&r))
}
ms.IncrementTracker(msg.TxID)
})
Expand All @@ -65,14 +66,10 @@ func (d *Distributor) RunNATSBus(poolsize int) {

for i := 0; i < poolsize; i++ {

i := i

sr := NewServiceRegister()
pc := NewNATSProcessChain()
ms := NewMessageStore()

log.Printf("Process Chain %d\n%#v", i, pc)

// create storage handler
go func(pc NATSProcessChain, ms *MessageStore) {

Expand All @@ -88,7 +85,8 @@ func (d *Distributor) RunNATSBus(poolsize int) {
pc.srvc_in_conn.Subscribe(pc.srvc_in_subject, func(m *NiasMessage) {
responses := sr.ProcessByRoute(m)
for _, response := range responses {
pc.srvc_out_conn.Publish(pc.store_in_subject, response)
r := response
pc.srvc_out_conn.Publish(pc.store_in_subject, r)
}
ms.IncrementTracker(m.TxID)
})
Expand All @@ -112,14 +110,10 @@ func (d *Distributor) RunMemBus(poolsize int) {

for i := 0; i < poolsize; i++ {

i := i

sr := NewServiceRegister()
pc := NewMemProcessChain()
ms := NewMessageStore()

log.Printf("Process Chain %d\n%#v", i, pc)

// create storage handler
go func(pc MemProcessChain, ms *MessageStore) {

Expand All @@ -135,9 +129,11 @@ func (d *Distributor) RunMemBus(poolsize int) {

for {
msg := <-pc.req_chan
// log.Printf("\t\tservice handler recieved msg: %+v", msg)
responses := sr.ProcessByRoute(msg)
for _, response := range responses {
pc.store_chan <- &response
r := response
pc.store_chan <- &r
}
ms.IncrementTracker(msg.TxID)
}
Expand Down
4 changes: 4 additions & 0 deletions lib/serviceregister.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (sr *ServiceRegister) ProcessByRoute(m *NiasMessage) []NiasMessage {

route := m.Route

// log.Printf("\t\tservice register recieved msg: %+v", m)

for _, sname := range route {

// retrieve service from registry & execute
Expand All @@ -109,6 +111,7 @@ func (sr *ServiceRegister) ProcessByRoute(m *NiasMessage) []NiasMessage {
log.Println("\t", err)
} else {
// pass the responses to the message store
// log.Printf("\t\tservice %s returned %d responses: %+v", sname, len(responses), responses)
for _, r := range responses {
response := r
response.Source = sname
Expand All @@ -117,6 +120,7 @@ func (sr *ServiceRegister) ProcessByRoute(m *NiasMessage) []NiasMessage {
}
}

// log.Printf("\t\tresponse messages: %+v", response_msgs)
return response_msgs

}
9 changes: 5 additions & 4 deletions lib/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ func enqueueXML(file multipart.File) (IngestResponse, error) {
msg.Target = VALIDATION_PREFIX
msg.Route = VALIDATION_ROUTE

// xml_ec.Publish(REQUEST_TOPIC, msg)
// xml_conn.Publish(REQUEST_TOPIC, EncodeNiasMessage(msg))
// req_chan <- msg
publish(msg)

}
Expand All @@ -175,7 +172,11 @@ func (nws *NIASWebServer) Run() {
case "NATS":
req_ec = CreateNATSConnection()
case "STAN":
req_conn, _ = stan.Connect(NIAS_CLUSTER_ID, nuid.Next())
var stan_err error
req_conn, stan_err = stan.Connect(NIAS_CLUSTER_ID, nuid.Next())
if stan_err != nil {
log.Fatalf("Unable to connect to STAN server with cluster id: %s\nError:%s\nService aborting...", NIAS_CLUSTER_ID, stan_err)
}
}

log.Println("Initialising uuid generator")
Expand Down

0 comments on commit 2752c8f

Please sign in to comment.