Skip to content

Commit

Permalink
Debug Pearson ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
opoudjis committed Mar 30, 2017
1 parent 2be83f1 commit 3248b93
Show file tree
Hide file tree
Showing 16 changed files with 303 additions and 276 deletions.
7 changes: 4 additions & 3 deletions app/naprr/naprr.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ func main() {

di := naprr.NewDataIngest()
di.Run()
//di.RunYr3Writing()
di.RunYr3Writing()
di.Close()

log.Println("Generating report data, Year 3 Writing...")
rb := naprr.NewReportBuilder()
//rb.RunYr3W(false)
log.Println("Generating report data...")
rb.Run()
log.Println("Generating report data, Year 3 Writing...")
rb.RunYr3W(false)
}

log.Println("Writing report files...")
Expand Down
82 changes: 49 additions & 33 deletions naprr/contractor_csvdata_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,19 @@ import (
"github.com/twinj/uuid"
"log"
"path/filepath"
"sync"
//"sync"
)

func (di *DataIngest) RunYr3Writing() {

uuid.Init()
csvFiles := parsePearsonCSVFileDirectory()

var wg sync.WaitGroup

for _, csvFile := range csvFiles {
wg.Add(1)
go di.ingestPearsonResultsFile(csvFile, &wg)
di.ingestPearsonResultsFile(csvFile)
}

wg.Wait()

di.finaliseTransactions()

di.sc.Close()

log.Println("All data files read, ingest complete.")
log.Println("All Yr 3 Writing data files read, ingest complete.")
}

func parsePearsonCSVFileDirectory() []string {
Expand Down Expand Up @@ -237,7 +228,7 @@ func wrapMessage(regr interface{}, i int, txid string, route string) *lib.NiasMe
return msg
}

func (di *DataIngest) ingestPearsonResultsFile(resultsFilePath string, wg *sync.WaitGroup) {
func (di *DataIngest) ingestPearsonResultsFile(resultsFilePath string) {

// create a connection to the streaming server
log.Println("Connecting to STAN server...")
Expand All @@ -260,43 +251,65 @@ func (di *DataIngest) ingestPearsonResultsFile(resultsFilePath string, wg *sync.
testRefId := uuid.NewV4().String()
naptest := nxml.NAPTest{TestID: testRefId}
//naptest.TestContent = nxml.TestContent{LocalId: r["booklet_id"], TestLevel: "3", TestDomain: "Writing"}
naptest.TestContent = nxml.TestContent{TestLevel: "3", TestDomain: "Writing"}
naptest.TestContent = nxml.TestContent{TestLevel: "3",
TestDomain: "Writing",
TestYear: "2017",
}
gt, err := di.ge.Encode(naptest)
if err != nil {
log.Println("Unable to gob-encode nap test: ", err)
}
di.sc.Publish("meta_yr3w", gt)
di.sc.Publish(META_YR3W_STREAM, gt)

// we will set up 1 testlet and 10 items, which are assessed in 1 rubric each
testletRefId := uuid.NewV4().String()
naptestlet := nxml.NAPTestlet{TestletID: testletRefId,
NAPTestRefId: testRefId,
}
naptestlet.TestletContent.LocationInStage = "1"
naptestlet.TestItemList.TestItem = make([]nxml.NAPTestlet_TestItem, 10)
for i := range naptestlet.TestItemList.TestItem {
naptestlet.TestItemList.TestItem[i] = nxml.NAPTestlet_TestItem{TestItemRefId: uuid.NewV4().String(),
SequenceNumber: string(i)}
tiri := uuid.NewV4().String()
naptestlet.TestItemList.TestItem[i] = nxml.NAPTestlet_TestItem{TestItemRefId: tiri,
TestItemLocalId: tiri,
SequenceNumber: fmt.Sprintf("%d", i)}
}
gtl, err := di.ge.Encode(naptestlet)
if err != nil {
log.Println("Unable to gob-encode nap testlet: ", err)
}
di.sc.Publish("meta_yr3w", gtl)
di.sc.Publish(META_YR3W_STREAM, gtl)

rubrics := [10]string{"audience", "text structure", "ideas", "character and setting", "vocabulary",
"cohesion", "paragraphing", "sentence structure", "punctuation", "spelling"}
rubricsabbr := [10]string{"AUD", "TXT", "IDE", "CAS", "VOC", "COH", "PAR", "SEN", "PUN", "SPE"}
rubricsabbr := [10]string{"aud", "txt", "ide", "cas", "voc", "coh", "par", "sen", "pun", "spe"}
maxscores := [10]string{"6", "4", "5", "4", "5", "4", "2", "6", "5", "6"}

for i := range naptestlet.TestItemList.TestItem {
naptestitem := nxml.NAPTestItem{ItemID: naptestlet.TestItemList.TestItem[i].TestItemRefId}
naptestitem.TestItemContent = nxml.TestItemContent{ItemName: rubrics[i]}
gti, err := di.ge.Encode(naptestlet)
naptestitem.TestItemContent = nxml.TestItemContent{ItemName: rubrics[i],
NAPTestItemLocalId: naptestlet.TestItemList.TestItem[i].TestItemRefId}
gti, err := di.ge.Encode(naptestitem)
if err != nil {
log.Println("Unable to gob-encode nap test item: ", err)
}
di.sc.Publish("meta_yr3w", gti)
di.sc.Publish(META_YR3W_STREAM, gti)
}

codeframe := nxml.NAPCodeFrame{RefId: uuid.NewV4().String(),
NAPTestRefId: testRefId,
}
codeframe.TestletList.Testlet = make([]nxml.NAPCodeFrame_Testlet, 1)
codeframe.TestletList.Testlet[0].NAPTestletRefId = testletRefId
codeframe.TestletList.Testlet[0].TestItemList.TestItem = make([]nxml.NAPCodeFrame_TestItem, 10)
for i := range naptestlet.TestItemList.TestItem {
codeframe.TestletList.Testlet[0].TestItemList.TestItem[i].TestItemRefId = naptestlet.TestItemList.TestItem[i].TestItemRefId
}
gtcf, err := di.ge.Encode(codeframe)
if err != nil {
log.Println("Unable to gob-encode nap codeframe: ", err)
}
di.sc.Publish(META_YR3W_STREAM, gtcf)

//defer file.Close()
totalStudents := 0
Expand Down Expand Up @@ -324,6 +337,8 @@ func (di *DataIngest) ingestPearsonResultsFile(resultsFilePath string, wg *sync.
ClassGroup: r["home_group"],
SchoolLocalId: r["school_vcaa_code"],
ASLSchoolId: r["asl_school_code"], // may be missing
YearLevel: "3",
TestLevel: "3",
}
// stopgap
if r["asl_school_code"] != "" {
Expand All @@ -336,7 +351,7 @@ func (di *DataIngest) ingestPearsonResultsFile(resultsFilePath string, wg *sync.
}
// store linkage locally
ss_link[regr.RefId] = regr.ASLSchoolId
di.sc.Publish("studentAndResults", gsp)
di.sc.Publish(RESULTS_YR3W_STREAM, gsp)
totalStudents++
event := nxml.NAPEvent{EventID: uuid.NewV4().String(),
SPRefID: studentRefId,
Expand All @@ -349,20 +364,20 @@ func (di *DataIngest) ingestPearsonResultsFile(resultsFilePath string, wg *sync.
event.Adjustment.BookletType = r["special_provision_other_text"]
if r["special_provision"] != "" {
event.Adjustment.PNPCodelist = struct {
PNPCode []string `xml:"PNPCode"`
PNPCode []string `xml:"PNPCode,omitempty"`
}{PNPCode: pearson2sifAdjustment(r["special_provision"])}
}
if r["reason_for_withholding"] != "" {
event.TestDisruptionList.TestDisruption = make([]struct {
Event string `xml:"Event"`
Event string `xml:"Event,omitempty"`
}, 1)
event.TestDisruptionList.TestDisruption[0].Event = pearson2sifTestDisruption(r["reason_for_withholding"])
}
ge, err := di.ge.Encode(event)
if err != nil {
log.Println("Unable to gob-encode nap event link: ", err)
}
di.sc.Publish("studentAndResults", ge)
di.sc.Publish(RESULTS_YR3W_STREAM, ge)

response := nxml.NAPResponseSet{ResponseID: uuid.NewV4().String(),
StudentID: studentRefId,
Expand All @@ -373,7 +388,8 @@ func (di *DataIngest) ingestPearsonResultsFile(resultsFilePath string, wg *sync.
response.TestletList.Testlet[0].ItemResponseList.ItemResponse = make([]nxml.NAPResponseSet_ItemResponse, 10)

for i := range naptestlet.TestItemList.TestItem {
response.TestletList.Testlet[0].ItemResponseList.ItemResponse[i] = nxml.NAPResponseSet_ItemResponse{ItemRefID: naptestlet.TestItemList.TestItem[i].TestItemRefId}
response.TestletList.Testlet[0].ItemResponseList.ItemResponse[i] = nxml.NAPResponseSet_ItemResponse{ItemRefID: naptestlet.TestItemList.TestItem[i].TestItemRefId,
SequenceNumber: naptestlet.TestItemList.TestItem[i].SequenceNumber}

response.TestletList.Testlet[0].ItemResponseList.ItemResponse[i].SubscoreList.Subscore = make([]nxml.NAPResponseSet_Subscore, 1)
}
Expand All @@ -384,7 +400,7 @@ func (di *DataIngest) ingestPearsonResultsFile(resultsFilePath string, wg *sync.
if err != nil {
log.Println("Unable to gob-encode student response set: ", err)
}
di.sc.Publish("studentAndResults", gr)
di.sc.Publish(RESULTS_YR3W_STREAM, gr)

}
log.Println("Finished reading data file...")
Expand All @@ -394,16 +410,16 @@ func (di *DataIngest) ingestPearsonResultsFile(resultsFilePath string, wg *sync.
if err != nil {
log.Println("Unable to gob-encode tx complete message: ", err)
}
di.sc.Publish("studentAndResults", geot)
di.sc.Publish("meta_yr3w", geot)
di.sc.Publish(RESULTS_YR3W_STREAM, geot)
di.sc.Publish(META_YR3W_STREAM, geot)

//di.assignResponsesToSchools(ss_link)

//log.Println("response assignment complete")

log.Printf("ingestion complete for %s", resultsFilePath)

wg.Done()
//wg.Done()

}

Expand All @@ -416,8 +432,8 @@ func rubricPopulate(seqPos int, abbr string, rubric string, maxscore string, r m
} else {
response.TestletList.Testlet[0].ItemResponseList.ItemResponse[seqPos].ResponseCorrectness = "Incorrect"
}
response.TestletList.Testlet[0].ItemResponseList.ItemResponse[0].SubscoreList.Subscore[seqPos].SubscoreType = rubric
response.TestletList.Testlet[0].ItemResponseList.ItemResponse[0].SubscoreList.Subscore[seqPos].SubscoreValue = r[abbr]
response.TestletList.Testlet[0].ItemResponseList.ItemResponse[seqPos].SubscoreList.Subscore[0].SubscoreType = rubric
response.TestletList.Testlet[0].ItemResponseList.ItemResponse[seqPos].SubscoreList.Subscore[0].SubscoreValue = r[abbr]
}
return response
}
17 changes: 9 additions & 8 deletions naprr/data_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func NewDataIngest() *DataIngest {
return &di
}

func (di *DataIngest) Close() {
di.sc.Close()
}

func (di *DataIngest) Run() {

xmlFiles := parseXMLFileDirectory()
Expand All @@ -37,15 +41,12 @@ func (di *DataIngest) Run() {

di.finaliseTransactions()

di.sc.Close()

log.Println("All data files read, ingest complete.")
}

func (di *DataIngest) RunSynchronous(FilePath string) {
di.ingestResultsFile(FilePath, nil)
di.finaliseTransactions()
di.sc.Close()
}

func parseXMLFileDirectory() []string {
Expand Down Expand Up @@ -113,7 +114,7 @@ func (di *DataIngest) ingestResultsFile(resultsFilePath string, wg *sync.WaitGro
if err != nil {
log.Println("Unable to gob-encode nap test: ", err)
}
di.sc.Publish("meta", gt)
di.sc.Publish(META_STREAM, gt)
totalTests++

case "NAPTestlet":
Expand All @@ -123,7 +124,7 @@ func (di *DataIngest) ingestResultsFile(resultsFilePath string, wg *sync.WaitGro
if err != nil {
log.Println("Unable to gob-encode nap testlet: ", err)
}
di.sc.Publish("meta", gtl)
di.sc.Publish(META_STREAM, gtl)
totalTestlets++

case "NAPTestItem":
Expand All @@ -133,7 +134,7 @@ func (di *DataIngest) ingestResultsFile(resultsFilePath string, wg *sync.WaitGro
if err != nil {
log.Println("Unable to gob-encode nap test item: ", err)
}
di.sc.Publish("meta", gti)
di.sc.Publish(META_STREAM, gti)
totalTestItems++

case "NAPTestScoreSummary":
Expand Down Expand Up @@ -173,7 +174,7 @@ func (di *DataIngest) ingestResultsFile(resultsFilePath string, wg *sync.WaitGro
if err != nil {
log.Println("Unable to gob-encode nap codeframe: ", err)
}
di.sc.Publish("meta", gcf)
di.sc.Publish(META_STREAM, gcf)
totalCodeFrames++

case "SchoolInfo":
Expand Down Expand Up @@ -231,7 +232,7 @@ func (di *DataIngest) ingestResultsFile(resultsFilePath string, wg *sync.WaitGro
log.Println("Unable to gob-encode tx complete message: ", err)
}
di.sc.Publish("responses", geot)
di.sc.Publish("meta", geot)
di.sc.Publish(META_STREAM, geot)

di.assignResponsesToSchools(ss_link)

Expand Down
2 changes: 1 addition & 1 deletion naprr/naprr_webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (rrs *ResultsReportingServer) Run() {
//
e.GET("/naprr/codeframe", func(c echo.Context) error {

cfds := rrs.sr.GetCodeFrameData()
cfds := rrs.sr.GetCodeFrameData("report.cframe")

return c.JSON(http.StatusAccepted, cfds)
})
Expand Down
6 changes: 3 additions & 3 deletions naprr/reportbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (rb *ReportBuilder) Run() {
var wg sync.WaitGroup

schools := rb.sr.GetSchoolDetails()
nd := rb.sr.GetNAPLANData("meta")
nd := rb.sr.GetNAPLANData(META_STREAM)

for _, subslice := range schools {
for _, school := range subslice {
Expand All @@ -45,13 +45,13 @@ func (rb *ReportBuilder) RunYr3W(schools bool) {

var wg sync.WaitGroup

schoolslist := rb.sr.GetSchoolDetails()
nd := rb.sr.GetNAPLANData("meta_yr3w")
log.Println("Getting student data")
sr := rb.sr.GetStudentAndResultsData()
log.Println("Gotten student data")
nd := rb.sr.GetNAPLANData(META_YR3W_STREAM)

if schools {
schoolslist := rb.sr.GetSchoolDetails()
for _, subslice := range schoolslist {
for _, school := range subslice {
wg.Add(1)
Expand Down
Loading

0 comments on commit 3248b93

Please sign in to comment.