From ead68dd7ab2ab502cefa7f0b35d382cb8da5abd6 Mon Sep 17 00:00:00 2001 From: Nick Nicholas Date: Mon, 27 Mar 2017 11:49:59 +1100 Subject: [PATCH] Pearson ingest --- naprr/contractor_csvdata_ingest.go | 53 ++++++++++++++------------- naprr/reportbuilder.go | 33 +++++++++++++++++ naprr/reportgenerator.go | 49 +++++++++++++++++++++++++ naprr/reportwriter.go | 57 +++++++++++++++++++++++++++++- naprr/streamreader.go | 45 +++++++++++++++++++++++ 5 files changed, 211 insertions(+), 26 deletions(-) diff --git a/naprr/contractor_csvdata_ingest.go b/naprr/contractor_csvdata_ingest.go index 3b57e79..1f8497f 100644 --- a/naprr/contractor_csvdata_ingest.go +++ b/naprr/contractor_csvdata_ingest.go @@ -195,32 +195,35 @@ func pearson2sifTestDisruption(s string) string { } func pearson2sifAdjustment(s string) []string { - switch s { - case "ET": - return []string{"ETA"} // conventional choice against ETB or ETC - case "RB": - return []string{"RBK"} - case "SS": - return []string{"SUP"} // conventional choice of NAPLAN Support for Separate Supervision - case "OS": - return []string{"OSS"} - case "SC": - return []string{"SCR"} - case "TY": - return []string{"AST"} // conventional choice of "assistive technology" for "typed response/attachment" - case "CT": - return []string{"AST"} - case "SP": - return []string{"SUP"} - case "CO": - return []string{"COL"} - case "SR": - return []string{"AIA"} - case "OT": - return []string{"AST"} // conventional choice of Assistive Technology for Othre ---- make it pearson-other - default: - return []string{} + ret := make([]string, 0) + for i := 0; i < len(s); i = i + 2 { + s1 := s[i : i+2] + switch s1 { + case "ET": + ret = append(ret, "ETA") // conventional choice against ETB or ETC + case "RB": + ret = append(ret, "RBK") + case "SS": + ret = append(ret, "SUP") // conventional choice of NAPLAN Support for Separate Supervision + case "OS": + ret = append(ret, "OSS") + case "SC": + ret = append(ret, "SCR") + case "TY": + ret = append(ret, "AST") // conventional choice of "assistive technology" for "typed response/attachment" + case "CT": + ret = append(ret, "AST") + case "SP": + ret = append(ret, "SUP") + case "CO": + ret = append(ret, "COL") + case "SR": + ret = append(ret, "AIA") + case "OT": + ret = append(ret, "AST") // conventional choice of Assistive Technology for Other ---- make it pearson-other + } } + return ret } func wrapMessage(regr interface{}, i int, txid string, route string) *lib.NiasMessage { diff --git a/naprr/reportbuilder.go b/naprr/reportbuilder.go index d67b433..3dcf4e1 100644 --- a/naprr/reportbuilder.go +++ b/naprr/reportbuilder.go @@ -40,6 +40,32 @@ func (rb *ReportBuilder) Run() { } +// Year 3 Writing +func (rb *ReportBuilder) RunYr3W(schools bool) { + + var wg sync.WaitGroup + + schoolslist := rb.sr.GetSchoolDetails() + nd := rb.sr.GetNAPLANData() + + if schools { + for _, subslice := range schoolslist { + for _, school := range subslice { + wg.Add(1) + go rb.createSchoolReports(nd, school.ACARAId, &wg) + } + } + } + + wg.Add(1) + go rb.createYr3WReports(nd, &wg) + + // block until all reports generated + wg.Wait() + log.Println("All reports generated") + +} + // generate school-level data reports func (rb *ReportBuilder) createSchoolReports(nd *NAPLANData, acaraid string, wg *sync.WaitGroup) { sd := rb.sr.GetSchoolData(acaraid) @@ -59,3 +85,10 @@ func (rb *ReportBuilder) createTestReports(nd *NAPLANData, wg *sync.WaitGroup) { log.Println("Codeframe data created.") wg.Done() } + +// generate test-level reports +func (rb *ReportBuilder) createYr3WReports(nd *NAPLANData, wg *sync.WaitGroup) { + rb.rg.GenerateYr3WData(nd) + log.Println("Year 3 Writing XML data created.") + wg.Done() +} diff --git a/naprr/reportgenerator.go b/naprr/reportgenerator.go index 97fb97b..386d428 100644 --- a/naprr/reportgenerator.go +++ b/naprr/reportgenerator.go @@ -24,6 +24,55 @@ func NewReportGenerator() *ReportGenerator { // routines to build the required reports // +// generate XML representations of ingested data, used in first instance to deal with ingests of Year 3 Writing data +// generated only once as represents structure of test not school-level data +func (rg *ReportGenerator) GenerateYr3WData(nd *NAPLANData) { + + count := 0 + cfds := make([]CodeFrameDataSet, 0) + + for _, codeframe := range nd.Codeframes { + for _, cf_testlet := range codeframe.TestletList.Testlet { + tl := nd.Testlets[cf_testlet.NAPTestletRefId] + // log.Printf("\t%s", tl.TestletContent.TestletName) + for _, cf_item := range cf_testlet.TestItemList.TestItem { + ti := nd.Items[cf_item.TestItemRefId] + // log.Printf("\t\t%s", ti.TestItemContent.ItemName) + cfd := CodeFrameDataSet{ + Test: nd.Tests[codeframe.NAPTestRefId], + Testlet: tl, + Item: ti, + } + cfds = append(cfds, cfd) + } + } + } + + count = len(cfds) + + // publish the records + for _, cfd := range cfds { + payload, err := rg.ge.Encode(cfd) + if err != nil { + log.Println("unable to encode codeframe: ", err) + } + // log.Printf("\t%s - %s - %s", cfd.Test.TestContent.TestDomain, + // cfd.Testlet.TestletContent.TestletName, cfd.Item.TestItemContent.ItemName) + rg.sc.Publish("reports.xml", payload) + } + + // finish the transaction - completion msg + txu := lib.TxStatusUpdate{TxComplete: true} + gtxu, err := rg.ge.Encode(txu) + if err != nil { + log.Println("unable to encode txu codeframe report: ", err) + } + rg.sc.Publish("reports.cframe", gtxu) + + log.Printf("codeframe records %d: ", count) + +} + // generate codeframe objects (currently as per VCAA requirements) // generated only once as represents strucure of test not school-level data func (rg *ReportGenerator) GenerateCodeFrameData(nd *NAPLANData) { diff --git a/naprr/reportwriter.go b/naprr/reportwriter.go index c6e37c1..495c203 100644 --- a/naprr/reportwriter.go +++ b/naprr/reportwriter.go @@ -3,6 +3,7 @@ package naprr import ( "bufio" "bytes" + "encoding/xml" "fmt" "io" "log" @@ -32,11 +33,30 @@ func (rw *ReportWriter) Run() { rw.writeSchoolLevelReports(schools) rw.writeAggregateSchoolReports(schools) rw.writeTestLevelReports() + rw.writeYr3WReports() log.Println("All reports written\n") } +// create data reports from the test strucutre +func (rw *ReportWriter) writeYr3WReports() { + + log.Println("Creating test-level reports...") + + var wg sync.WaitGroup + + cfds := rw.sr.GetCodeFrameData() + + wg.Add(2) + + go rw.writeYr3WritingReport(cfds, &wg) + + wg.Wait() + + log.Println("Year 3 Writing XML created.") +} + // create data reports from the test strucutre func (rw *ReportWriter) writeTestLevelReports() { @@ -173,7 +193,7 @@ func (rw *ReportWriter) writeSchoolReports(acaraid string, wg *sync.WaitGroup) { wg.Done() } -// report of test strucure for writing items only +// report of test structure for writing items only // with extended item information func (rw *ReportWriter) writeCodeFrameWritingReport(cfds []CodeFrameDataSet, wg *sync.WaitGroup) { @@ -228,6 +248,41 @@ func (rw *ReportWriter) writeCodeFrameWritingReport(cfds []CodeFrameDataSet, wg } +// report of test structure for writing items only +// with extended item information +func (rw *ReportWriter) writeYr3WritingReport(cfds []CodeFrameDataSet, wg *sync.WaitGroup) { + + // create directory for the school + fpath := "yr3w/" + err := os.MkdirAll(fpath, os.ModePerm) + check(err) + + // create the report data file in the output directory + // delete any ecisting files and create empty new one + fname := fpath + "codeframe_writing.xml" + err = os.RemoveAll(fname) + f, err := os.Create(fname) + check(err) + defer f.Close() + + e := xml.NewEncoder(f) + e.Indent("", " ") + f.WriteString("\n") + // write the data - writing items only + for _, cfd := range cfds { + if cfd.Test.TestContent.TestDomain == "Writing" { + e.Encode(cfd) + } + } + e.Flush() + f.WriteString("\n") + + log.Printf("Codeframe writing report created for: %d elements", len(cfds)) + + wg.Done() + +} + // report of test structure, is written only once // as an aggrregate report, not at school level func (rw *ReportWriter) writeCodeFrameReport(cfds []CodeFrameDataSet, wg *sync.WaitGroup) { diff --git a/naprr/streamreader.go b/naprr/streamreader.go index 72054e2..67b5c4a 100644 --- a/naprr/streamreader.go +++ b/naprr/streamreader.go @@ -26,6 +26,51 @@ func NewStreamReader() *StreamReader { return &sr } +func (sr *StreamReader) GetXMLData() []CodeFrameDataSet { + + cfds := make([]CodeFrameDataSet, 0) + + // signal channel to notify asynch stan stream read is complete + txComplete := make(chan bool) + + // main message handling callback for the stan stream + // get names of schools that have been processed by ingest + // and create reports + mcb := func(m *stan.Msg) { + + // as we don't know message type ([]byte slice on wire) decode as interface + // then assert type dynamically + var m_if interface{} + err := sr.ge.Decode(m.Data, &m_if) + if err != nil { + log.Println("streamreader codeframe message decoding error: ", err) + txComplete <- true + } + + switch t := m_if.(type) { + case CodeFrameDataSet: + cfd := m_if.(CodeFrameDataSet) + cfds = append(cfds, cfd) + case lib.TxStatusUpdate: + txComplete <- true + default: + _ = t + // log.Printf("unknown message type in participation data handler: %v", m_if) + } + } + + sub, err := sr.sc.Subscribe("reports.pearson", mcb, stan.DeliverAllAvailable()) + defer sub.Unsubscribe() + if err != nil { + log.Println("streamreader: stan subsciption error get codeframe data: ", err) + } + + <-txComplete + + return cfds + +} + func (sr *StreamReader) GetCodeFrameData() []CodeFrameDataSet { cfds := make([]CodeFrameDataSet, 0)