Skip to content

Commit

Permalink
Pearson ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
opoudjis committed Mar 27, 2017
1 parent 9a19e66 commit ead68dd
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 26 deletions.
53 changes: 28 additions & 25 deletions naprr/contractor_csvdata_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,32 +195,35 @@ func pearson2sifTestDisruption(s string) string {
}

func pearson2sifAdjustment(s string) []string {
switch s {
case "ET":
return []string{"ETA"} // conventional choice against ETB or ETC
case "RB":
return []string{"RBK"}
case "SS":
return []string{"SUP"} // conventional choice of NAPLAN Support for Separate Supervision
case "OS":
return []string{"OSS"}
case "SC":
return []string{"SCR"}
case "TY":
return []string{"AST"} // conventional choice of "assistive technology" for "typed response/attachment"
case "CT":
return []string{"AST"}
case "SP":
return []string{"SUP"}
case "CO":
return []string{"COL"}
case "SR":
return []string{"AIA"}
case "OT":
return []string{"AST"} // conventional choice of Assistive Technology for Othre ---- make it pearson-other
default:
return []string{}
ret := make([]string, 0)
for i := 0; i < len(s); i = i + 2 {
s1 := s[i : i+2]
switch s1 {
case "ET":
ret = append(ret, "ETA") // conventional choice against ETB or ETC
case "RB":
ret = append(ret, "RBK")
case "SS":
ret = append(ret, "SUP") // conventional choice of NAPLAN Support for Separate Supervision
case "OS":
ret = append(ret, "OSS")
case "SC":
ret = append(ret, "SCR")
case "TY":
ret = append(ret, "AST") // conventional choice of "assistive technology" for "typed response/attachment"
case "CT":
ret = append(ret, "AST")
case "SP":
ret = append(ret, "SUP")
case "CO":
ret = append(ret, "COL")
case "SR":
ret = append(ret, "AIA")
case "OT":
ret = append(ret, "AST") // conventional choice of Assistive Technology for Other ---- make it pearson-other
}
}
return ret
}

func wrapMessage(regr interface{}, i int, txid string, route string) *lib.NiasMessage {
Expand Down
33 changes: 33 additions & 0 deletions naprr/reportbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,32 @@ func (rb *ReportBuilder) Run() {

}

// Year 3 Writing
func (rb *ReportBuilder) RunYr3W(schools bool) {

var wg sync.WaitGroup

schoolslist := rb.sr.GetSchoolDetails()
nd := rb.sr.GetNAPLANData()

if schools {
for _, subslice := range schoolslist {
for _, school := range subslice {
wg.Add(1)
go rb.createSchoolReports(nd, school.ACARAId, &wg)
}
}
}

wg.Add(1)
go rb.createYr3WReports(nd, &wg)

// block until all reports generated
wg.Wait()
log.Println("All reports generated")

}

// generate school-level data reports
func (rb *ReportBuilder) createSchoolReports(nd *NAPLANData, acaraid string, wg *sync.WaitGroup) {
sd := rb.sr.GetSchoolData(acaraid)
Expand All @@ -59,3 +85,10 @@ func (rb *ReportBuilder) createTestReports(nd *NAPLANData, wg *sync.WaitGroup) {
log.Println("Codeframe data created.")
wg.Done()
}

// generate test-level reports
func (rb *ReportBuilder) createYr3WReports(nd *NAPLANData, wg *sync.WaitGroup) {
rb.rg.GenerateYr3WData(nd)
log.Println("Year 3 Writing XML data created.")
wg.Done()
}
49 changes: 49 additions & 0 deletions naprr/reportgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,55 @@ func NewReportGenerator() *ReportGenerator {
// routines to build the required reports
//

// generate XML representations of ingested data, used in first instance to deal with ingests of Year 3 Writing data
// generated only once as represents structure of test not school-level data
func (rg *ReportGenerator) GenerateYr3WData(nd *NAPLANData) {

count := 0
cfds := make([]CodeFrameDataSet, 0)

for _, codeframe := range nd.Codeframes {
for _, cf_testlet := range codeframe.TestletList.Testlet {
tl := nd.Testlets[cf_testlet.NAPTestletRefId]
// log.Printf("\t%s", tl.TestletContent.TestletName)
for _, cf_item := range cf_testlet.TestItemList.TestItem {
ti := nd.Items[cf_item.TestItemRefId]
// log.Printf("\t\t%s", ti.TestItemContent.ItemName)
cfd := CodeFrameDataSet{
Test: nd.Tests[codeframe.NAPTestRefId],
Testlet: tl,
Item: ti,
}
cfds = append(cfds, cfd)
}
}
}

count = len(cfds)

// publish the records
for _, cfd := range cfds {
payload, err := rg.ge.Encode(cfd)
if err != nil {
log.Println("unable to encode codeframe: ", err)
}
// log.Printf("\t%s - %s - %s", cfd.Test.TestContent.TestDomain,
// cfd.Testlet.TestletContent.TestletName, cfd.Item.TestItemContent.ItemName)
rg.sc.Publish("reports.xml", payload)
}

// finish the transaction - completion msg
txu := lib.TxStatusUpdate{TxComplete: true}
gtxu, err := rg.ge.Encode(txu)
if err != nil {
log.Println("unable to encode txu codeframe report: ", err)
}
rg.sc.Publish("reports.cframe", gtxu)

log.Printf("codeframe records %d: ", count)

}

// generate codeframe objects (currently as per VCAA requirements)
// generated only once as represents strucure of test not school-level data
func (rg *ReportGenerator) GenerateCodeFrameData(nd *NAPLANData) {
Expand Down
57 changes: 56 additions & 1 deletion naprr/reportwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package naprr
import (
"bufio"
"bytes"
"encoding/xml"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -32,11 +33,30 @@ func (rw *ReportWriter) Run() {
rw.writeSchoolLevelReports(schools)
rw.writeAggregateSchoolReports(schools)
rw.writeTestLevelReports()
rw.writeYr3WReports()

log.Println("All reports written\n")

}

// create data reports from the test strucutre
func (rw *ReportWriter) writeYr3WReports() {

log.Println("Creating test-level reports...")

var wg sync.WaitGroup

cfds := rw.sr.GetCodeFrameData()

wg.Add(2)

go rw.writeYr3WritingReport(cfds, &wg)

wg.Wait()

log.Println("Year 3 Writing XML created.")
}

// create data reports from the test strucutre
func (rw *ReportWriter) writeTestLevelReports() {

Expand Down Expand Up @@ -173,7 +193,7 @@ func (rw *ReportWriter) writeSchoolReports(acaraid string, wg *sync.WaitGroup) {
wg.Done()
}

// report of test strucure for writing items only
// report of test structure for writing items only
// with extended item information
func (rw *ReportWriter) writeCodeFrameWritingReport(cfds []CodeFrameDataSet, wg *sync.WaitGroup) {

Expand Down Expand Up @@ -228,6 +248,41 @@ func (rw *ReportWriter) writeCodeFrameWritingReport(cfds []CodeFrameDataSet, wg

}

// report of test structure for writing items only
// with extended item information
func (rw *ReportWriter) writeYr3WritingReport(cfds []CodeFrameDataSet, wg *sync.WaitGroup) {

// create directory for the school
fpath := "yr3w/"
err := os.MkdirAll(fpath, os.ModePerm)
check(err)

// create the report data file in the output directory
// delete any ecisting files and create empty new one
fname := fpath + "codeframe_writing.xml"
err = os.RemoveAll(fname)
f, err := os.Create(fname)
check(err)
defer f.Close()

e := xml.NewEncoder(f)
e.Indent("", " ")
f.WriteString("<NAPResulsReporting>\n")
// write the data - writing items only
for _, cfd := range cfds {
if cfd.Test.TestContent.TestDomain == "Writing" {
e.Encode(cfd)
}
}
e.Flush()
f.WriteString("</NAPResulsReporting>\n")

log.Printf("Codeframe writing report created for: %d elements", len(cfds))

wg.Done()

}

// report of test structure, is written only once
// as an aggrregate report, not at school level
func (rw *ReportWriter) writeCodeFrameReport(cfds []CodeFrameDataSet, wg *sync.WaitGroup) {
Expand Down
45 changes: 45 additions & 0 deletions naprr/streamreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,51 @@ func NewStreamReader() *StreamReader {
return &sr
}

func (sr *StreamReader) GetXMLData() []CodeFrameDataSet {

cfds := make([]CodeFrameDataSet, 0)

// signal channel to notify asynch stan stream read is complete
txComplete := make(chan bool)

// main message handling callback for the stan stream
// get names of schools that have been processed by ingest
// and create reports
mcb := func(m *stan.Msg) {

// as we don't know message type ([]byte slice on wire) decode as interface
// then assert type dynamically
var m_if interface{}
err := sr.ge.Decode(m.Data, &m_if)
if err != nil {
log.Println("streamreader codeframe message decoding error: ", err)
txComplete <- true
}

switch t := m_if.(type) {
case CodeFrameDataSet:
cfd := m_if.(CodeFrameDataSet)
cfds = append(cfds, cfd)
case lib.TxStatusUpdate:
txComplete <- true
default:
_ = t
// log.Printf("unknown message type in participation data handler: %v", m_if)
}
}

sub, err := sr.sc.Subscribe("reports.pearson", mcb, stan.DeliverAllAvailable())
defer sub.Unsubscribe()
if err != nil {
log.Println("streamreader: stan subsciption error get codeframe data: ", err)
}

<-txComplete

return cfds

}

func (sr *StreamReader) GetCodeFrameData() []CodeFrameDataSet {

cfds := make([]CodeFrameDataSet, 0)
Expand Down

0 comments on commit ead68dd

Please sign in to comment.