From c313f98bc483e417c80f989b121bfa944cfb5ffe Mon Sep 17 00:00:00 2001 From: "Dominik.Kotecki" Date: Sun, 31 Dec 2023 14:26:56 +0100 Subject: [PATCH] fix devto parser --- internal/parser/devto/parser.go | 65 ++++++++++----------------------- 1 file changed, 20 insertions(+), 45 deletions(-) diff --git a/internal/parser/devto/parser.go b/internal/parser/devto/parser.go index 3dd368c..e4afe7b 100644 --- a/internal/parser/devto/parser.go +++ b/internal/parser/devto/parser.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/dominikus1993/dev-news-bot/internal/parser/utils" "github.com/dominikus1993/dev-news-bot/pkg/model" jsoniter "github.com/json-iterator/go" log "github.com/sirupsen/logrus" @@ -67,56 +68,30 @@ func NewDevToParser(tags []string) *devtoParser { } } -func mapPostToArticle(sub *devtoresponse) []model.Article { +func streamArticles(sub *devtoresponse, stream chan<- model.Article) { tag := *sub - articles := make([]model.Article, len(tag)) - for i, post := range tag { - articles[i] = model.NewArticleWithContent(post.Title, post.URL, post.Description, source) + for _, post := range tag { + stream <- model.NewArticleWithContent(post.Title, post.URL, post.Description, source) } - return articles } -func (p *devtoParser) parseAll(ctx context.Context) chan []model.Article { - stream := make(chan []model.Article, 10) - go func() { - defer close(stream) - var wg sync.WaitGroup - for _, sub := range p.tags { - wg.Add(1) - go func(s string, wait *sync.WaitGroup) { - defer wg.Done() - res, err := parseTag(ctx, p.client, s) - if err != nil { - log.WithContext(ctx).WithError(err).WithField("tag", s).Errorln("Error while parsing tag") - } else { - stream <- mapPostToArticle(res) - } - }(sub, &wg) - } - wg.Wait() - }() - return stream -} - -func flatten(stream chan []model.Article) model.ArticlesStream { - result := make(chan model.Article, 100) - go func() { - defer close(result) - var wg sync.WaitGroup - for articles := range stream { - wg.Add(1) - go func(articless []model.Article, stream chan model.Article, wait *sync.WaitGroup) { - defer wait.Done() - for _, article := range articless { - stream <- article - } - }(articles, result, &wg) - } - wg.Wait() - }() - return result +func (p *devtoParser) parseArticles(ctx context.Context, stream chan<- model.Article) { + var wg sync.WaitGroup + for _, sub := range p.tags { + wg.Add(1) + go func(s string, wait *sync.WaitGroup, result chan<- model.Article) { + defer wg.Done() + res, err := parseTag(ctx, p.client, s) + if err != nil { + log.WithContext(ctx).WithError(err).WithField("tag", s).Errorln("Error while parsing tag") + } else { + streamArticles(res, result) + } + }(sub, &wg, stream) + } + wg.Wait() } func (p *devtoParser) Parse(ctx context.Context) model.ArticlesStream { - return flatten(p.parseAll(ctx)) + return utils.Parse(ctx, p.parseArticles) }