Skip to content

Commit

Permalink
use data pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikus1993 committed Feb 13, 2022
1 parent e1ee97d commit 799a251
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 46 deletions.
13 changes: 12 additions & 1 deletion pkg/model/article.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type Article struct {
Link string
}

type ArticlesStream <-chan Article

func NewArticleWithContent(title, link, content string) Article {
return Article{
Title: title,
Expand Down Expand Up @@ -43,10 +45,11 @@ func (a *Article) IsValid() bool {
return isUrl(a.Link) && contentIsValid && titleIsValid
}

func TakeRandomArticles(articles []Article, take int) []Article {
func TakeRandomArticles(stream ArticlesStream, take int) []Article {
if take == 0 {
return make([]Article, 0)
}
articles := ToArticlesArray(stream)
if take >= len(articles) {
return articles
}
Expand All @@ -58,3 +61,11 @@ func TakeRandomArticles(articles []Article, take int) []Article {

return randomArticles
}

func ToArticlesArray(s ArticlesStream) []Article {
res := make([]Article, 0)
for v := range s {
res = append(res, v)
}
return res
}
18 changes: 15 additions & 3 deletions pkg/model/article_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,31 @@ func TestArticleValidationWhenUrlIsCorrect(t *testing.T) {
}

func TestGetRandomArticlesWhenTakeIsZero(t *testing.T) {
articles := []Article{NewArticle("x", "2"), NewArticle("d", "1"), NewArticle("xd", "37")}
articles := make(chan Article, 10)
for _, a := range []Article{NewArticle("x", "2"), NewArticle("d", "1"), NewArticle("xd", "37")} {
articles <- a
}
close(articles)
randomArticles := TakeRandomArticles(articles, 0)
assert.Len(t, randomArticles, 0)
}

func TestGetRandomArticlesWhenTakeIsGreaterThanLenOfArticlesArray(t *testing.T) {
articles := []Article{NewArticle("x", "2"), NewArticle("d", "1"), NewArticle("xd", "37")}
articles := make(chan Article, 10)
for _, a := range []Article{NewArticle("x", "2"), NewArticle("d", "1"), NewArticle("xd", "37")} {
articles <- a
}
close(articles)
randomArticles := TakeRandomArticles(articles, 5)
assert.Len(t, randomArticles, len(articles))
}

func TestGetRandomArticlesWhenTakeIsSmallerThanLenOfArticlesArray(t *testing.T) {
articles := []Article{NewArticle("x", "2"), NewArticle("d", "1"), NewArticle("xd", "37")}
articles := make(chan Article, 10)
for _, a := range []Article{NewArticle("x", "2"), NewArticle("d", "1"), NewArticle("xd", "37")} {
articles <- a
}
close(articles)
randomArticles := TakeRandomArticles(articles, 2)
assert.Len(t, randomArticles, 2)
}
31 changes: 13 additions & 18 deletions pkg/providers/articles.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type ArticlesProvider interface {
Provide(ctx context.Context) ([]model.Article, error)
Provide(ctx context.Context) model.ArticlesStream
}

type articlesProvider struct {
Expand All @@ -26,7 +26,11 @@ func fanIn(ctx context.Context, stream ...chan model.Article) chan model.Article
out := make(chan model.Article)
output := func(c <-chan model.Article) {
for v := range c {
out <- v
select {
case <-ctx.Done():
return
case out <- v:
}
}
wg.Done()
}
Expand All @@ -49,31 +53,22 @@ func (f *articlesProvider) parse(ctx context.Context, parser parsers.ArticlesPar
log.WithError(err).WithContext(ctx).Error("Error while parsing articles")
} else {
for _, v := range res {
stream <- v
select {
case <-ctx.Done():
return
case stream <- v:
}
}
}
close(stream)
}()
return stream
}

func (f *articlesProvider) Provide(ctx context.Context) ([]model.Article, error) {
func (f *articlesProvider) Provide(ctx context.Context) model.ArticlesStream {
streams := make([]chan model.Article, 0, len(f.parsers))
result := make([]model.Article, 0)
for _, parser := range f.parsers {
streams = append(streams, f.parse(ctx, parser))
}
finalStream := fanIn(ctx, streams...)
for {
select {
case v, ok := <-finalStream:
if ok {
result = append(result, v)
} else {
return result, nil
}
case <-ctx.Done():
return result, nil
}
}
return fanIn(ctx, streams...)
}
6 changes: 2 additions & 4 deletions pkg/providers/articles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@ func (f *fakeErrorParser) Parse(ctx context.Context) ([]model.Article, error) {

func TestArticlesProvider(t *testing.T) {
articlesProvider := NewArticlesProvider([]parsers.ArticlesParser{&fakeParser{}, &fakeParser2{}})
subject, err := articlesProvider.Provide(context.Background())
assert.Nil(t, err)
subject := model.ToArticlesArray(articlesProvider.Provide(context.Background()))
assert.Len(t, subject, 2)
assert.Equal(t, "test", subject[0].Title)
assert.Equal(t, "test", subject[1].Title)
}

func TestArticlesProviderWhenError(t *testing.T) {
articlesProvider := NewArticlesProvider([]parsers.ArticlesParser{&fakeParser{}, &fakeErrorParser{}})
subject, err := articlesProvider.Provide(context.Background())
assert.Nil(t, err)
subject := model.ToArticlesArray(articlesProvider.Provide(context.Background()))
assert.Len(t, subject, 1)
assert.Equal(t, "test", subject[0].Title)
}
48 changes: 28 additions & 20 deletions pkg/usecase/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package usecase

import (
"context"
"sync"

"github.com/dominikus1993/dev-news-bot/pkg/model"
"github.com/dominikus1993/dev-news-bot/pkg/notifications"
Expand All @@ -20,42 +21,49 @@ func NewParseArticlesAndSendItUseCase(articlesProvider providers.ArticlesProvide
return &ParseArticlesAndSendItUseCase{articlesProvider: articlesProvider, repository: repository, broadcaster: broadcaster}
}

func (u *ParseArticlesAndSendItUseCase) filterNewArticles(ctx context.Context, articles []model.Article) []model.Article {
filteredArticles := make([]model.Article, 0, len(articles))
for _, article := range articles {
func pipe(ctx context.Context, articles model.ArticlesStream, f func(ctx context.Context, article model.Article, articles chan<- model.Article)) model.ArticlesStream {
filteredArticles := make(chan model.Article, 10)
go func() {
var wg sync.WaitGroup
for article := range articles {
wg.Add(1)
go func(a model.Article) {
f(ctx, a, filteredArticles)
wg.Done()
}(article)
}
wg.Wait()
close(filteredArticles)
}()
return filteredArticles
}

func (u *ParseArticlesAndSendItUseCase) filterNewArticles(ctx context.Context, articles model.ArticlesStream) model.ArticlesStream {
return pipe(ctx, articles, func(ctx context.Context, article model.Article, articles chan<- model.Article) {
isNew, err := u.repository.IsNew(ctx, article)
if err != nil {
log.WithField("ArticleLink", article.Link).WithError(err).WithContext(ctx).Error("error while checking if article exists")
}
if isNew {
filteredArticles = append(filteredArticles, article)
articles <- article
}
}
return filteredArticles
})
}

func (u *ParseArticlesAndSendItUseCase) filterValid(ctx context.Context, articles []model.Article) []model.Article {
filteredArticles := make([]model.Article, 0, len(articles))
for _, article := range articles {
func (u *ParseArticlesAndSendItUseCase) filterValid(ctx context.Context, articles model.ArticlesStream) model.ArticlesStream {
return pipe(ctx, articles, func(ctx context.Context, article model.Article, articles chan<- model.Article) {
if article.IsValid() {
filteredArticles = append(filteredArticles, article)
articles <- article
}
}
return filteredArticles
})
}

func (u *ParseArticlesAndSendItUseCase) Execute(ctx context.Context, articlesQuantity int) error {
articles, err := u.articlesProvider.Provide(ctx)
if err != nil {
return err
}
log.Infoln("Found articles:", len(articles))
articles := u.articlesProvider.Provide(ctx)
validArticles := u.filterValid(ctx, articles)
log.Infoln("Found valid articles:", len(validArticles))
newArticles := u.filterNewArticles(ctx, validArticles)
log.Infoln("Found new articles:", len(newArticles))
randomArticles := model.TakeRandomArticles(newArticles, articlesQuantity)
err = u.broadcaster.Broadcast(ctx, randomArticles)
err := u.broadcaster.Broadcast(ctx, randomArticles)
if err != nil {
return err
}
Expand Down

0 comments on commit 799a251

Please sign in to comment.