Skip to content

Commit

Permalink
Merge pull request #24 from damienfamed75/squash
Browse files Browse the repository at this point in the history
Squash Bugs & Errors
  • Loading branch information
EviiViviana authored Oct 14, 2019
2 parents 54ae9d8 + 09c379f commit 98c9a9a
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 30 deletions.
3 changes: 1 addition & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func (c *Client) InsertNode(ctx context.Context, dg *dgo.Dgraph, o *Operation) (
case o.SetSingleDupleNode != nil:
_, err = c.mutateSingleDupleNode(ctx, dg, o.SetSingleDupleNode, uidMap, &sync.Mutex{})
case o.SetMultiDupleNode != nil:
// TODO work out some way to convert the slice to []interface{} without copying.
// This could possibly be using the "unsafe" package.
// To safely convert the []interface{} we loop through even if the process may be slow and painful.
tmp := make([]interface{}, len(o.SetMultiDupleNode))
for i, t := range o.SetMultiDupleNode {
tmp[i] = t
Expand Down
2 changes: 1 addition & 1 deletion examples/schemacheck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func main() {
// you may set that when creating the client and using
// quirk.WithPredicateKey(predicateName string)
for k, v := range uidMap {
log.Printf("UIDMap: [%s] [%s]\n", k, v)
log.Printf("UIDMap: [%v] [%v]\n", k, v)
}

// t := dg.NewReadOnlyTxn()
Expand Down
12 changes: 12 additions & 0 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"sync"

"github.com/cheggaaa/pb/v3"
"github.com/damienfamed75/yalp"
"github.com/dgraph-io/dgo/v2"
)

Expand Down Expand Up @@ -61,6 +63,16 @@ type (
identifier string
uid string
}

// workerPackage is used for some common items that a worker needs
// that do not include the channels or essential UID map.
workerPackage struct {
dg *dgo.Dgraph
m *sync.Mutex
mutateSingleStruct mutateSingle
logger yalp.Logger
bar *pb.ProgressBar
}
)

// interfaces used within for testing.
Expand Down
35 changes: 21 additions & 14 deletions mutate_multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"sync"

"github.com/damienfamed75/yalp"

"github.com/cheggaaa/pb/v3"
"github.com/dgraph-io/dgo/v2"
)
Expand All @@ -15,7 +13,7 @@ func (c *Client) mutateMulti(ctx context.Context, dg *dgo.Dgraph,
dat []interface{}, uidMap map[string]UID, mutateFunc mutateSingle) error {
// Create waitgroup and channels.
var (
wg sync.WaitGroup
// wg sync.WaitGroup
m sync.Mutex
limit = c.maxWorkerCount
datLen = len(dat)
Expand All @@ -33,11 +31,23 @@ func (c *Client) mutateMulti(ctx context.Context, dg *dgo.Dgraph,
bar := pb.ProgressBarTemplate(c.template).Start(datLen)
bar.SetWidth(bar.Width()/2 + bar.Width()/4)

// pkg is the more non-focused items that when reading through as a new
// user, you don't need to focus on as much as some others. For example
// the user would want to see the path of the UID map and finding it
// will be easier with the lesser amount of parameters.
pkg := &workerPackage{
dg: dg,
m: &m,
mutateSingleStruct: mutateFunc,
logger: c.logger,
bar: bar,
}

// Launch workers.
for i := 0; i < limit; i++ {
wg.Add(1)
go mutationWorker(ctx, dg, &wg, &m, mutateFunc, c.logger, bar,
uidMap, read, quit, done)
// go mutationWorker(ctx, dg, &m, mutateFunc, c.logger, bar,
// uidMap, read, quit, done)
go mutationWorker(ctx, pkg, uidMap, read, quit, done)
}

// Send data to workers via channel.
Expand All @@ -47,10 +57,10 @@ func (c *Client) mutateMulti(ctx context.Context, dg *dgo.Dgraph,

close(read)

return launchWorkers(limit, &wg, bar, done, quit)
return launchWorkers(limit, bar, done, quit)
}

func launchWorkers(limit int, wg *sync.WaitGroup, bar *pb.ProgressBar,
func launchWorkers(limit int, bar *pb.ProgressBar,
done chan error, quit chan bool) error {

var err error
Expand All @@ -68,17 +78,14 @@ func launchWorkers(limit int, wg *sync.WaitGroup, bar *pb.ProgressBar,
}

// Wait for all the workers to finish.
wg.Wait()
bar.Finish()

return err
}

func mutationWorker(ctx context.Context, dg *dgo.Dgraph, wg *sync.WaitGroup,
m *sync.Mutex, mutateSingleStruct mutateSingle, logger yalp.Logger, bar *pb.ProgressBar,
func mutationWorker(ctx context.Context, pkg *workerPackage,
uidMap map[string]UID, read chan interface{}, quit chan bool, done chan error) {
// Defer that the waitgroup is finished.
defer wg.Done()
var err error

// For each signal received in read channel.
Expand All @@ -89,7 +96,7 @@ ReadLoop:
Forever:
for {
// MutateSingleStruct with received struct.
_, mutErr := mutateSingleStruct(ctx, dg, data, uidMap, m)
_, mutErr := pkg.mutateSingleStruct(ctx, pkg.dg, data, uidMap, pkg.m)

switch mutErr {
case nil:
Expand All @@ -105,7 +112,7 @@ ReadLoop:

// Increment the progress bar once the node is either successfully added
// or successfully updated.
bar.Increment()
pkg.bar.Increment()
}

// Mark done.
Expand Down
18 changes: 8 additions & 10 deletions mutate_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestLaunchWorkers(t *testing.T) {
)

g.It("should not error", func() {
g.Assert(launchWorkers(0, &sync.WaitGroup{}, &pb.ProgressBar{}, done, quit)).
g.Assert(launchWorkers(0, &pb.ProgressBar{}, done, quit)).
Equal(nil)
})
})
Expand All @@ -48,7 +48,6 @@ func TestMutationWorker(t *testing.T) {

g.Describe("mutation worker", func() {
var (
wg sync.WaitGroup
m sync.Mutex
mSS = NewClient(WithPredicateKey("username")).mutateSingleStruct
ctx = context.Background()
Expand All @@ -63,10 +62,11 @@ func TestMutationWorker(t *testing.T) {
read := make(chan interface{})
quit := make(chan bool)

wg.Add(1)
api.shouldAbort = false
// oof that's a lot of parameters...
go mutationWorker(ctx, dg, &wg, &m, mSS, logger, &pb.ProgressBar{}, uidMap, read, quit, done)
// Hello past self, don't worry I got your back covered.
pkg := &workerPackage{dg, &m, mSS, logger, &pb.ProgressBar{}}
go mutationWorker(ctx, pkg, uidMap, read, quit, done)

// So then the logging if statement passes.
time.Sleep(200 * time.Millisecond)
Expand All @@ -77,18 +77,18 @@ func TestMutationWorker(t *testing.T) {
read <- &testPersonCorrect

close(read)

wg.Wait()
})

g.It("should not error when old", func() {
read := make(chan interface{})
quit := make(chan bool)

wg.Add(1)
api.shouldAbort = true

// oof that's a lot of parameters...
go mutationWorker(ctx, dg, &wg, &m, mSS, logger, &pb.ProgressBar{}, uidMap, read, quit, done)
// Hello past self, don't worry I got your back covered.
pkg := &workerPackage{dg, &m, mSS, logger, &pb.ProgressBar{}}
go mutationWorker(ctx, pkg, uidMap, read, quit, done)

read <- &testPersonCorrect

Expand All @@ -101,8 +101,6 @@ func TestMutationWorker(t *testing.T) {
err := <-done

g.Assert(err).Equal(nil)

wg.Wait()
})
})
}
5 changes: 4 additions & 1 deletion testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func (t *testBuilder) String() string {
return t.stringOutput
}

func (*testBuilder) Reset() {}
// Reset is empty because testBuilder doesn't hold any information in it in the first place.
func (*testBuilder) Reset() {
// This function is just for testing.
}

type testDgraphClient struct {
queryUseCount int
Expand Down
2 changes: 0 additions & 2 deletions upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (c *Client) tryUpsert(ctx context.Context, txn *dgo.Txn, dat *DupleNode) *u
if uid == "" {
new = true
// Insert new node.
// TODO remove string concatenation.
uidMap, err := setNode(ctx, txn, &builder, "_:"+identifier, dat)
if err != nil {
return &upsertResponse{
Expand All @@ -49,7 +48,6 @@ func (c *Client) tryUpsert(ctx context.Context, txn *dgo.Txn, dat *DupleNode) *u
}
} else {
// Update the found node.
// TODO remove string concatenation.
_, err = setNode(ctx, txn, &builder, "<"+uid+">", dat)
if err != nil {
return &upsertResponse{
Expand Down

0 comments on commit 98c9a9a

Please sign in to comment.