Skip to content

Commit

Permalink
rewrite with uwe
Browse files Browse the repository at this point in the history
  • Loading branch information
DmytryiLaukhin committed Sep 12, 2019
1 parent e42995c commit 6fec373
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 223 deletions.
47 changes: 20 additions & 27 deletions app/api/service.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package api

import (
"context"
"fmt"
"net/http"
"time"

Expand All @@ -20,25 +18,26 @@ import (
"github.com/lancer-kit/sender/repo/providers/sms/twilio"
"github.com/lancer-kit/sender/repo/providers/sms/viber"
"github.com/lancer-kit/sender/repo/providers/sms/whatsapp"
"github.com/lancer-kit/uwe/v2"
"github.com/lancer-kit/uwe/v2/presets/api"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type Service struct {
ctx context.Context
cfg *config.Cfg
logger *logrus.Entry

emailSender email.Sender
smsSenders map[sms.Provider]smsp.Sender
}

func New(ctx context.Context, cfg *config.Cfg, logger *logrus.Entry) *Service {
func New(cfg *config.Cfg, logger *logrus.Entry) *Service {
return &Service{
logger: logger.WithField("worker", config.WorkerAPIServer),
ctx: ctx,
cfg: cfg,
}

}

func (s *Service) Init() error {
Expand All @@ -53,26 +52,20 @@ func (s *Service) Init() error {
return nil
}

func (s *Service) Run(errChan chan<- error) {
router := s.router(s.logger, s.cfg.API.APIRequestTimeout)
addr := fmt.Sprintf("%s:%d", s.cfg.API.Host, s.cfg.API.Port)
server := &http.Server{Addr: addr, Handler: router}

go func() {
s.logger.Info("Starting API Service at: ", addr)

if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
errChan <- errors.Wrap(err, "server failed")
}
}()

<-s.ctx.Done()
s.logger.Info("Shutting down the API Service...")
if err := server.Shutdown(s.ctx); err != nil {
errChan <- errors.Wrap(err, "shutdown failed")
func (s *Service) Run(ctx uwe.Context) error {
server := api.NewServer(
api.Config{
Host: s.cfg.API.Host,
Port: s.cfg.API.Port,
},
s.router(s.cfg.API.APIRequestTimeout),
)
s.logger.Info("Starting API Service")
if err := server.Run(ctx); err != nil {
return err
}

s.logger.Info("API Service gracefully stopped")
s.logger.Info("API gracefully stopped")
return nil
}

func (s *Service) initSMSProviders() {
Expand All @@ -99,10 +92,10 @@ func (s *Service) initEmailProvider() {
}
}

func (s *Service) router(logger *logrus.Entry, requestTimeout int) chi.Router {
func (s *Service) router(requestTimeout int) http.Handler {
r := chi.NewRouter()

r.Use(log.NewRequestLogger(logger.Logger))
r.Use(log.NewRequestLogger(s.logger.Logger))
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Recoverer)
Expand All @@ -116,7 +109,7 @@ func (s *Service) router(logger *logrus.Entry, requestTimeout int) chi.Router {

r.Route("/v1/sender", func(r chi.Router) {
h := &handler{
logger: logger,
logger: s.logger,
smsSenders: s.smsSenders,
emailSender: s.emailSender,
}
Expand Down
117 changes: 38 additions & 79 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,104 +1,63 @@
package app

import (
"context"
"os"
"os/signal"
"sync"
"syscall"

"github.com/lancer-kit/armory/log"
"github.com/lancer-kit/armory/natsx"
"github.com/lancer-kit/sender/app/api"
"github.com/lancer-kit/sender/app/asyncapi"
"github.com/lancer-kit/sender/config"
"github.com/lancer-kit/uwe/v2"
"github.com/sirupsen/logrus"
)

type App struct {
cfg *config.Cfg
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
logger *logrus.Entry
workers WorkerList
errors map[string]chan error
cfg *config.Cfg
logger *logrus.Entry
}

func New(cfg *config.Cfg) *App {
ctx, cancel := context.WithCancel(context.Background())
logger := log.Default.WithField("app", config.ServiceName)
workers := WorkerList{
config.WorkerAPIServer: api.New(ctx, cfg, logger),
config.WorkerAsyncAPIEmail: asyncapi.NewEmail(ctx, cfg, logger),
config.WorkerAsyncAPISms: asyncapi.NewSms(ctx, cfg, logger),
}
return &App{
cfg: cfg,
workers: workers,
logger: logger,
cancel: cancel,
ctx: ctx,
wg: new(sync.WaitGroup),
}
a := new(App)
a.cfg = cfg
a.logger = log.Default.WithField("app", config.ServiceName)
return a
}

func (a *App) Run() {
a.errors = make(map[string]chan error)
for name := range a.workers {
a.errors[name] = make(chan error)
}

natsx.SetConfig(a.cfg.NATS)

a.workers.RunAll(a.cfg.Workers, a.errors)

go a.checkWorkerErrors(a.errors)

a.GracefulStop()
}

func (a *App) GracefulStop() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

<-signalChan

a.logger.Logger.Debugln("Begin graceful shutdown...")
a.cancel()
a.wg.Wait()
a.logger.Logger.Debugln("Service successfully stopped")
}

func (a App) Context() context.Context {
return a.ctx
}

func (a App) Logger() *logrus.Entry {
return a.logger
}

func (a *App) WAdd(delta int) {
a.wg.Add(delta)
}

func (a *App) WDone() {
a.wg.Done()
}

func (a *App) checkWorkerErrors(errors map[string]chan error) {
for name, err := range errors {
go a.logErrors(name, err)
chief := uwe.NewChief()
workers := a.workers()
for _, name := range a.cfg.Workers {
chief.AddWorker(uwe.WorkerName(name), workers[uwe.WorkerName(name)])
}
chief.UseDefaultRecover()
chief.SetEventHandler(a.eventHandler)
chief.Run()
}

func (a *App) workers() map[uwe.WorkerName]uwe.Worker {
return map[uwe.WorkerName]uwe.Worker{
config.WorkerAPIServer: api.New(
a.cfg,
a.logger.WithField("worker", config.WorkerAPIServer),
),
config.WorkerAsyncAPIEmail: asyncapi.NewEmail(
a.cfg,
a.logger.WithField("worker", config.WorkerAsyncAPIEmail),
),
config.WorkerAsyncAPISms: asyncapi.NewSms(
a.cfg,
a.logger.WithField("worker", config.WorkerAsyncAPISms),
),
}
<-a.Context().Done()
}

func (a *App) logErrors(name string, errStream chan error) {
for err := range errStream {
if err != nil {
a.Logger().WithError(err).
WithField("worker", name).
Errorln("error from worker")
}
func (a *App) eventHandler(event uwe.Event) {
logger := a.logger.WithField("level", event.Level).WithField("worker", event.Worker)
if err := event.ToError(); err != nil {
logger.WithError(err).Error("error in chief")
return
}

logger.WithField("message", event.Message).Info("chief log")
}
27 changes: 11 additions & 16 deletions app/asyncapi/email.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,51 @@
package asyncapi

import (
"context"
"encoding/json"

"github.com/lancer-kit/sender/config"

"github.com/lancer-kit/armory/natsx"
"github.com/lancer-kit/sender/config"
"github.com/lancer-kit/sender/models/email"
emailp "github.com/lancer-kit/sender/repo/providers/email"
"github.com/lancer-kit/sender/repo/providers/email/mailgun"
"github.com/lancer-kit/sender/repo/providers/email/sendgrid"
"github.com/lancer-kit/sender/repo/providers/email/smtp"
"github.com/lancer-kit/uwe/v2"
"github.com/nats-io/go-nats"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type EmailService struct {
ctx context.Context
cfg *config.Cfg
logger *logrus.Entry
sender emailp.Sender
}

func NewEmail(ctx context.Context, cfg *config.Cfg, logger *logrus.Entry) *EmailService {
func NewEmail(cfg *config.Cfg, logger *logrus.Entry) uwe.Worker {
return &EmailService{
logger: logger.WithField("worker", config.WorkerAsyncAPIEmail),
cfg: cfg,
ctx: ctx,
}
}

func (s *EmailService) Init() error {
if s.initEmailProvider(); s.sender == nil {
return errors.New("sms providers does not set")
return errors.New("email providers does not set")
}

return nil
}

func (s *EmailService) Run(errChan chan<- error) {
func (s *EmailService) Run(ctx uwe.Context) error {
bus := make(chan *nats.Msg)
sub, err := natsx.Subscribe(email.Topic, bus)
if err != nil {
errChan <- errors.Wrap(err, "unable to open subscription")
return
return errors.Wrap(err, "unable to open subscription")
}
defer func() {
if err = sub.Unsubscribe(); err != nil {
errChan <- errors.Wrap(err, "unable to unsubscribe")
s.logger.WithError(err).Info("unable to unsubscribe")
}
}()

Expand All @@ -64,16 +60,15 @@ func (s *EmailService) Run(errChan chan<- error) {
s.logger.Debug("got new email message")

if err = s.processMsg(msg.Data); err != nil {
errChan <- errors.Wrap(err, "msg processing failed")
s.logger.WithError(err).Error("message processing failed")
continue
}

s.logger.Debug("email was sent")

case <-s.ctx.Done():
s.logger.Info("email async api gracefully stopped")
return

case <-ctx.Done():
s.logger.Info("Async-API-Email gracefully stopped")
return nil
}
}
}
Expand Down
Loading

0 comments on commit 6fec373

Please sign in to comment.