Skip to content
This repository has been archived by the owner on Sep 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #29 from palavrapasse/28-enhancement-use-goroutine…
Browse files Browse the repository at this point in the history
…s-when-records-of-primary-tables-is-large

enhancement: use goroutines when records of primary tables is large
  • Loading branch information
rutesantos4 authored Jan 30, 2023
2 parents adc0398 + c9d7998 commit 7a49ec0
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 11 deletions.
8 changes: 4 additions & 4 deletions pkg/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,19 @@ func (ctx DatabaseContext[Record]) Insert(i Import) (AutoGenKey, error) {

cbs := []AnonymousErrorCallback{
func() (any, error) {
return typedInsertAndFindPrimary(TransactionContext[User](tctx), NewUserTable(us))
return typedInsertAndFindPrimary(TransactionContext[User](tctx), NewConcurrentPrimaryTable(MaxElementsOfGoroutine, us, NewUserTable))
},
func() (any, error) {
return typedInsertAndFindPrimary(TransactionContext[Credentials](tctx), NewCredentialsTable(cr))
return typedInsertAndFindPrimary(TransactionContext[Credentials](tctx), NewConcurrentPrimaryTable(MaxElementsOfGoroutine, cr, NewCredentialsTable))
},
func() (any, error) {
return typedInsertAndFindPrimary(TransactionContext[BadActor](tctx), NewBadActorTable(i.Leakers))
return typedInsertAndFindPrimary(TransactionContext[BadActor](tctx), NewConcurrentPrimaryTable(MaxElementsOfGoroutine, i.Leakers, NewBadActorTable))
},
func() (any, error) {
return typedInsertAndFindPrimary(TransactionContext[Leak](tctx), NewLeakTable(i.Leak))
},
func() (any, error) {
return typedInsertAndFindPrimary(TransactionContext[Platform](tctx), NewPlatformTable(i.AffectedPlatforms))
return typedInsertAndFindPrimary(TransactionContext[Platform](tctx), NewConcurrentPrimaryTable(MaxElementsOfGoroutine, i.AffectedPlatforms, NewPlatformTable))
},
}

Expand Down
72 changes: 65 additions & 7 deletions pkg/database/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ type concurrentHashForeignTableResult[R Record] struct {
routineId int
}

type concurrentPrimaryTableResult[R Record] struct {
primaryTable PrimaryTable[R]
routineId int
}

func MultiplePlaceholder(lv int) string {
phs := make([]string, lv)

Expand Down Expand Up @@ -252,17 +257,70 @@ func NewConcurrentHashForeignTable[F HashCredentials | HashUser, P Credentials |
close(resultChan)
}()

mapResult := make(map[int]concurrentHashForeignTableResult[F])

for r := range resultChan {
mapResult[r.routineId] = r
}

result := ForeignTable[F]{}
i := 0

for i < ngoroutines {
for r := range resultChan {
for i := 0; i < ngoroutines; i++ {
result.Records = append(result.Records, mapResult[i].hashForeignTable.Records...)
}

if r.routineId == i {
result.Records = append(result.Records, r.hashForeignTable.Records...)
i++
}
return result
}

func NewConcurrentPrimaryTable[P BadActor | User | Credentials | Platform](maxElementsOfGoroutine int, primaryElements []P, newPrimaryTableCallback func([]P) PrimaryTable[P]) PrimaryTable[P] {

ngoroutines := 1
nelements := len(primaryElements)

if nelements > maxElementsOfGoroutine {
ngoroutines = int(math.Ceil(float64(nelements) / float64(maxElementsOfGoroutine)))
}

resultChan := make(chan concurrentPrimaryTableResult[P])

var wg sync.WaitGroup

wg.Add(ngoroutines)

for i := 0; i < ngoroutines; i++ {

init := i * maxElementsOfGoroutine
end := (i + 1) * maxElementsOfGoroutine
if end > nelements {
end = nelements
}

go func(lines []P, routineId int) {

defer wg.Done()
resultChan <- concurrentPrimaryTableResult[P]{
routineId: routineId,
primaryTable: newPrimaryTableCallback(lines),
}

}(primaryElements[init:end], i)
}

go func() {
wg.Wait()
close(resultChan)
}()

mapResult := make(map[int]concurrentPrimaryTableResult[P])

for r := range resultChan {
mapResult[r.routineId] = r
}

result := PrimaryTable[P]{}

for i := 0; i < ngoroutines; i++ {
result.Records = append(result.Records, mapResult[i].primaryTable.Records...)
}

return result
Expand Down
24 changes: 24 additions & 0 deletions pkg/database/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,27 @@ func TestNewConcurrentHashForeignTableWithGoRoutinesReturnsTheSameAsNoRoutineFun
t.Fatalf("NewConcurrentHashForeignTable should have returned the same result and order as the function with no goroutines, but got: %v", result)
}
}

func TestNewConcurrentPrimaryTableWithoutGoRoutinesReturnsTheSameAsNoRoutineFunction(t *testing.T) {
tb := []Credentials{{Password: Password("pass1word")}, {Password: Password("pass2word")}, {Password: Password("pass3word")}, {Password: Password("pass3word")}}

expected := NewCredentialsTable(tb)

result := NewConcurrentPrimaryTable(len(tb), tb, NewCredentialsTable)

if !reflect.DeepEqual(expected, result) {
t.Fatalf("NewConcurrentPrimaryTable should have returned the same result and order as the function with no goroutines, but got: %v", result)
}
}

func TestNewConcurrentPrimaryTableWithGoRoutinesReturnsTheSameAsNoRoutineFunction(t *testing.T) {
tb := []Credentials{{Password: Password("pass1word")}, {Password: Password("pass2word")}, {Password: Password("pass3word")}, {Password: Password("pass4word")}, {Password: Password("pass5word")}}

expected := NewCredentialsTable(tb)

result := NewConcurrentPrimaryTable(len(tb)-2, tb, NewCredentialsTable)

if !reflect.DeepEqual(expected, result) {
t.Fatalf("NewConcurrentPrimaryTable should have returned the same result and order as the function with no goroutines, but got: %v", result)
}
}

0 comments on commit 7a49ec0

Please sign in to comment.