Skip to content

Commit

Permalink
improved error handling while loading topicsMap
Browse files Browse the repository at this point in the history
  • Loading branch information
Oluwawunmi committed Aug 12, 2024
1 parent c00aacf commit 32c7022
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 21 deletions.
10 changes: 7 additions & 3 deletions task/reindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func reindex(ctx context.Context, cfg *config.Config) error {
t := &Tracker{}
errChan := make(chan error, 1)

topicsMapChan := retrieveTopicsMap(ctx, cfg.TopicTaggingEnabled, cfg.ServiceAuthToken, topicClient)
topicsMapChan := retrieveTopicsMap(ctx, errChan, cfg.TopicTaggingEnabled, cfg.ServiceAuthToken, topicClient)

datasetChan, _ := extractDatasets(ctx, t, errChan, datasetClient, cfg.ServiceAuthToken, cfg.PaginationLimit)
editionChan, _ := retrieveDatasetEditions(ctx, t, datasetClient, datasetChan, cfg.ServiceAuthToken, cfg.MaxDatasetExtractions)
Expand Down Expand Up @@ -317,13 +317,17 @@ func transformZebedeeDoc(ctx context.Context, tracker *Tracker, errChan chan err
}
}

func retrieveTopicsMap(ctx context.Context, enabled bool, serviceAuthToken string, topicClient topicCli.Clienter) chan map[string]Topic {
func retrieveTopicsMap(ctx context.Context, errorChan chan error, enabled bool, serviceAuthToken string, topicClient topicCli.Clienter) chan map[string]Topic {
topicsMapChan := make(chan map[string]Topic, 1)

go func() {
defer close(topicsMapChan)
if enabled {
topicsMap := LoadTopicsMap(ctx, serviceAuthToken, topicClient)
topicsMap, err := LoadTopicsMap(ctx, serviceAuthToken, topicClient)
if err != nil {
errorChan <- err
return
}
topicsMapChan <- topicsMap
log.Info(ctx, "finished retrieving topics map", log.Data{"map_size": len(topicsMap)})
} else {
Expand Down
35 changes: 20 additions & 15 deletions task/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/ONSdigital/log.go/v2/log"
)

const tokenBearer = "Bearer "

type Topic struct {
ID string
Slug string
Expand All @@ -19,19 +21,18 @@ type Topic struct {

// LoadTopicsMap is a function to load the topic map in publishing (private) mode.
// This function talks to the dp-topic-api via its private endpoints to retrieve the root topic and its subtopic ids
// The data returned by the dp-topic-api is of type *models.PrivateSubtopics which is then transformed in this function for the controller
// If an error has occurred, this is captured in log.Error and then an empty map is returned
func LoadTopicsMap(ctx context.Context, serviceAuthToken string, topicClient topicCli.Clienter) map[string]Topic {
// The data returned by the dp-topic-api is of type *models.PrivateSubtopics which is then mapped to match the *Topic struct
func LoadTopicsMap(ctx context.Context, serviceAuthToken string, topicClient topicCli.Clienter) (map[string]Topic, error) {
processedTopics := make(map[string]struct{})
topicMap := make(map[string]Topic)

// get root topic from dp-topic-api
rootTopic, err := topicClient.GetRootTopicsPrivate(ctx, topicCli.Headers{ServiceAuthToken: "Bearer " + serviceAuthToken})
rootTopic, err := topicClient.GetRootTopicsPrivate(ctx, topicCli.Headers{ServiceAuthToken: tokenBearer + serviceAuthToken})
if err != nil {
log.Error(ctx, "failed to get root topic from topic-api", err, log.Data{
"req_headers": topicCli.Headers{},
})
return topicMap
return nil, err
}

// dereference root topics items to allow ranging through them
Expand All @@ -41,46 +42,47 @@ func LoadTopicsMap(ctx context.Context, serviceAuthToken string, topicClient top
} else {
err := errors.New("root topic private items is nil")
log.Error(ctx, "failed to dereference root topic items pointer", err)
return topicMap
return nil, err
}

// recursively process topics and their subtopics
for i := range rootTopicItems {
processTopic(ctx, serviceAuthToken, topicClient, rootTopicItems[i].ID, topicMap, processedTopics, "", "", 0)
if err := processTopic(ctx, serviceAuthToken, topicClient, rootTopicItems[i].ID, topicMap, processedTopics, "", "", 0); err != nil {
return nil, err
}
}

// Check if any topics were found
if len(topicMap) == 0 {
err := errors.New("root topic found, but no subtopics were returned")
log.Error(ctx, "No topics loaded into map - root topic found, but no subtopics were returned", err)
return nil, err
}
return topicMap
return topicMap, nil
}

func processTopic(ctx context.Context, serviceAuthToken string, topicClient topicCli.Clienter, topicID string, topicMap map[string]Topic, processedTopics map[string]struct{}, parentTopicID, parentTopicSlug string, depth int) {
func processTopic(ctx context.Context, serviceAuthToken string, topicClient topicCli.Clienter, topicID string, topicMap map[string]Topic, processedTopics map[string]struct{}, parentTopicID, parentTopicSlug string, depth int) error {
log.Info(ctx, "Processing topic at depth", log.Data{
"topic_id": topicID,
"depth": depth,
})

// Check if the topic has already been processed
if _, exists := processedTopics[topicID]; exists {
err := errors.New("topic already processed")
log.Error(ctx, "Skipping already processed topic", err, log.Data{
log.Info(ctx, "Skipping already processed topic", log.Data{
"topic_id": topicID,
"depth": depth,
})
return
}

// Get the topic details from the topic client
topic, err := topicClient.GetTopicPrivate(ctx, topicCli.Headers{ServiceAuthToken: "Bearer " + serviceAuthToken}, topicID)
topic, err := topicClient.GetTopicPrivate(ctx, topicCli.Headers{ServiceAuthToken: tokenBearer + serviceAuthToken}, topicID)
if err != nil {
log.Error(ctx, "failed to get topic details from topic-api", err, log.Data{
"topic_id": topicID,
"depth": depth,
})
return
return err
}

if topic != nil {
Expand All @@ -96,10 +98,13 @@ func processTopic(ctx context.Context, serviceAuthToken string, topicClient topi
// Process each subtopic recursively
if topic.Current.SubtopicIds != nil {
for _, subTopicID := range *topic.Current.SubtopicIds {
processTopic(ctx, serviceAuthToken, topicClient, subTopicID, topicMap, processedTopics, topicID, topic.Current.Slug, depth+1)
if err := processTopic(ctx, serviceAuthToken, topicClient, subTopicID, topicMap, processedTopics, topicID, topic.Current.Slug, depth+1); err != nil {
return err
}
}
}
}
return nil
}

func mapTopicModelToStruct(topic models.Topic, parentID, parentSlug string) Topic {
Expand Down
9 changes: 6 additions & 3 deletions task/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func TestLoadTopicsMap(t *testing.T) {
}

Convey("When LoadTopicsMap is called with enableTopicTagging is enabled and root topics are retrieved and processed successfully", t, func() {
topicMap := LoadTopicsMap(ctx, serviceAuthToken, mockClient)
topicMap, err := LoadTopicsMap(ctx, serviceAuthToken, mockClient)
So(err, ShouldBeNil)
So(len(topicMap), ShouldEqual, 3)
So(topicMap["economy"].ID, ShouldEqual, "6734")
So(topicMap["business"].ID, ShouldEqual, "1234")
Expand All @@ -107,7 +108,8 @@ func TestLoadTopicsMap(t *testing.T) {
}
},
}
topicMap := LoadTopicsMap(ctx, serviceAuthToken, mockClient)
topicMap, err := LoadTopicsMap(ctx, serviceAuthToken, mockClient)
So(err, ShouldNotBeNil)
So(len(topicMap), ShouldEqual, 0)
})

Expand All @@ -123,7 +125,8 @@ func TestLoadTopicsMap(t *testing.T) {
}, nil
},
}
topicMap := LoadTopicsMap(ctx, serviceAuthToken, mockClient)
topicMap, err := LoadTopicsMap(ctx, serviceAuthToken, mockClient)
So(err, ShouldNotBeNil)
So(len(topicMap), ShouldEqual, 0)
})
}

0 comments on commit 32c7022

Please sign in to comment.