Skip to content

Commit

Permalink
fix devto parser
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikus1993 committed Dec 31, 2023
1 parent 7bfe763 commit c313f98
Showing 1 changed file with 20 additions and 45 deletions.
65 changes: 20 additions & 45 deletions internal/parser/devto/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/dominikus1993/dev-news-bot/internal/parser/utils"
"github.com/dominikus1993/dev-news-bot/pkg/model"
jsoniter "github.com/json-iterator/go"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -67,56 +68,30 @@ func NewDevToParser(tags []string) *devtoParser {
}
}

func mapPostToArticle(sub *devtoresponse) []model.Article {
func streamArticles(sub *devtoresponse, stream chan<- model.Article) {
tag := *sub
articles := make([]model.Article, len(tag))
for i, post := range tag {
articles[i] = model.NewArticleWithContent(post.Title, post.URL, post.Description, source)
for _, post := range tag {
stream <- model.NewArticleWithContent(post.Title, post.URL, post.Description, source)
}
return articles
}

func (p *devtoParser) parseAll(ctx context.Context) chan []model.Article {
stream := make(chan []model.Article, 10)
go func() {
defer close(stream)
var wg sync.WaitGroup
for _, sub := range p.tags {
wg.Add(1)
go func(s string, wait *sync.WaitGroup) {
defer wg.Done()
res, err := parseTag(ctx, p.client, s)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("tag", s).Errorln("Error while parsing tag")
} else {
stream <- mapPostToArticle(res)
}
}(sub, &wg)
}
wg.Wait()
}()
return stream
}

func flatten(stream chan []model.Article) model.ArticlesStream {
result := make(chan model.Article, 100)
go func() {
defer close(result)
var wg sync.WaitGroup
for articles := range stream {
wg.Add(1)
go func(articless []model.Article, stream chan model.Article, wait *sync.WaitGroup) {
defer wait.Done()
for _, article := range articless {
stream <- article
}
}(articles, result, &wg)
}
wg.Wait()
}()
return result
func (p *devtoParser) parseArticles(ctx context.Context, stream chan<- model.Article) {
var wg sync.WaitGroup
for _, sub := range p.tags {
wg.Add(1)
go func(s string, wait *sync.WaitGroup, result chan<- model.Article) {
defer wg.Done()
res, err := parseTag(ctx, p.client, s)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("tag", s).Errorln("Error while parsing tag")
} else {
streamArticles(res, result)
}
}(sub, &wg, stream)
}
wg.Wait()
}

func (p *devtoParser) Parse(ctx context.Context) model.ArticlesStream {
return flatten(p.parseAll(ctx))
return utils.Parse(ctx, p.parseArticles)
}

0 comments on commit c313f98

Please sign in to comment.