Skip to content

Commit

Permalink
add maxlag
Browse files Browse the repository at this point in the history
  • Loading branch information
rucciva committed Nov 20, 2023
1 parent e7a1afd commit 08f3cc7
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
26 changes: 16 additions & 10 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,26 @@ import (
ltsregion "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/lts/v2/region"
)

var maxEndFromNow = strToDurationOrDefault(os.Getenv("HWC_LOGSTREAM_MAX_END_FROM_NOW"), time.Minute)
var maxFetchRange = strToDurationOrDefault(os.Getenv("HWC_LOGSTREAM_MAX_FETCH_RANGE"), 5*time.Minute)
var minFetchRange = strToDurationOrDefault(os.Getenv("HWC_LOGSTREAM_MIN_FETCH_RANGE"), time.Minute)
var streamRoutine = int(strToIntOrDefault(os.Getenv("HWC_LOGSTREAM_ROUTINE"), 5))
var streamPosTag = envOrDefault("HWC_LOGSTREAM_POSITITION_TAG", "x-hwc-logstream-pos")
var streamExclusionTag = envOrDefault("HWC_LOGSTREAM_EXCLUSION_TAG", "x-hwc-logstream-exclude")
var regionID = envOrDefault("HUAWEICLOUD_SDK_REGION_ID", "ap-southeast-4")

var addRegion = map[string]*region.Region{
var (
maxEndFromNow = strToDurationOrDefault(os.Getenv("HWC_LOGSTREAM_MAX_END_FROM_NOW"), time.Minute)
maxFetchRange = strToDurationOrDefault(os.Getenv("HWC_LOGSTREAM_MAX_FETCH_RANGE"), 5*time.Minute)
minFetchRange = strToDurationOrDefault(os.Getenv("HWC_LOGSTREAM_MIN_FETCH_RANGE"), time.Minute)
maxLag = strToDurationOrDefault(os.Getenv("HWC_LOGSTREAM_MAX_LAG"), 24*time.Hour)

streamRoutine = int(strToIntOrDefault(os.Getenv("HWC_LOGSTREAM_ROUTINE"), 5))

streamPosTag = envOrDefault("HWC_LOGSTREAM_POSITITION_TAG", "x-hwc-logstream-pos")
streamExclusionTag = envOrDefault("HWC_LOGSTREAM_EXCLUSION_TAG", "x-hwc-logstream-exclude")

regionID = envOrDefault("HUAWEICLOUD_SDK_REGION_ID", "ap-southeast-4")
)

var additionalRegion = map[string]*region.Region{
"ap-southeast-4": region.NewRegion("ap-southeast-4", "https://lts.ap-southeast-4.myhuaweicloud.com"),
}

func regionFromEnv() *region.Region {
if r, ok := addRegion[regionID]; ok {
if r, ok := additionalRegion[regionID]; ok {
return r
}

Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func main() {
maxEndFromNow: maxEndFromNow,
maxFetchRange: maxFetchRange,
minFetchRange: minFetchRange,
maxLag: maxLag,
}
errc <- mgr.Start(ctx, streamRoutine)
}()
Expand Down
3 changes: 3 additions & 0 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type LogstreamManager struct {
maxEndFromNow time.Duration
maxFetchRange time.Duration
minFetchRange time.Duration
maxLag time.Duration
}

func (m LogstreamManager) Start(ctx context.Context, routine int) (err error) {
Expand Down Expand Up @@ -69,6 +70,8 @@ func (m LogstreamManager) Start(ctx context.Context, routine int) (err error) {
continue
}

s.SkiptoCatchUp(m.maxLag)

end := time.Now().Add(-m.maxEndFromNow)
if e := s.start.Add(m.maxFetchRange); e.Before(end) {
end = e
Expand Down
8 changes: 8 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,11 @@ func (s *Logstream) SavePositition() (err error) {
s.start = s.last
return
}

func (s *Logstream) SkiptoCatchUp(maxlag time.Duration) {
now := time.Now()
if now.Sub(s.start) <= maxlag {
return
}
s.start = now.Add(-maxlag)
}

0 comments on commit 08f3cc7

Please sign in to comment.