Skip to content
This repository has been archived by the owner on May 24, 2024. It is now read-only.

Commit

Permalink
Shard cache per half year (#95)
Browse files Browse the repository at this point in the history
* shard cache per half year

Signed-off-by: Ayman <enkhalifapro@gmail.com>

* clean up

Signed-off-by: Ayman <enkhalifapro@gmail.com>

* clean up code

Signed-off-by: Ayman <enkhalifapro@gmail.com>

* handle next year cache

Signed-off-by: Ayman <enkhalifapro@gmail.com>

* clean up

Signed-off-by: Ayman <enkhalifapro@gmail.com>

* clean up

Signed-off-by: Ayman <enkhalifapro@gmail.com>

* code clean up

Signed-off-by: Ayman <enkhalifapro@gmail.com>

---------

Signed-off-by: Ayman <enkhalifapro@gmail.com>
Co-authored-by: Ayman <enkhalifapro@gmail.com>
  • Loading branch information
khalifapro and enkhalifapro authored Apr 12, 2023
1 parent 2967014 commit be8da62
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 31 deletions.
85 changes: 57 additions & 28 deletions cmd/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ const (
// Success status
Success = "success"
// GitConnector ...
GitConnector = "git-connector"
PackSize = 1000
HotRepoCount = 50000
GitConnector = "git-connector"
PackSize = 1000
HotRepoCount = 50000
YearFirstHalf = "first-half"
YearSecondHalf = "second-half"
)

var (
Expand Down Expand Up @@ -524,16 +526,18 @@ var (
// GitTrailerPPAuthors - trailer name to authors map (for pair programming)
GitTrailerPPAuthors = map[string]string{"Signed-off-by": "authors_signed_off", "Co-authored-by": "co_authors"}
// max upstream date
gMaxUpstreamDt time.Time
gMaxUpstreamDtMtx = &sync.Mutex{}
cachedCommits = make(map[string]CommitCache)
commitsCacheFile = "commits-cache.csv"
createdCommits = make(map[string]bool)
IsHotRep = false
CommitsByYearCacheFile = "commits-cache-%s.csv"
CommitsUpdateCacheFile = "commits-update-cache.csv"
CurrentCacheYear = 1970
CachedCommitsUpdates = make(map[string]CommitCache)
gMaxUpstreamDt time.Time
gMaxUpstreamDtMtx = &sync.Mutex{}
cachedCommits = make(map[string]CommitCache)
commitsCacheFile = "commits-cache.csv"
createdCommits = make(map[string]bool)
IsHotRep = false
CommitsByYearCacheFile = "commits-cache-%s.csv"
CommitsUpdateCacheFile = "commits-update-cache.csv"
CurrentCacheYear = 1970
CachedCommitsUpdates = make(map[string]CommitCache)
CommitsByYearHalfCacheFile = "commits-cache-%s-%s.csv"
CurrentCacheYearHalf = YearFirstHalf
)

// Publisher - for streaming data to Kinesis
Expand Down Expand Up @@ -1869,7 +1873,7 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
return
}
} else {
if err = j.createYearCacheFile(commits, path); err != nil {
if err = j.createYearHalfCacheFile(commits, path); err != nil {
return
}
}
Expand Down Expand Up @@ -3160,7 +3164,10 @@ func (j *DSGit) SyncV2(ctx *shared.Ctx) (err error) {
if commitsCount >= HotRepoCount {
IsHotRep = true
CurrentCacheYear = from.Year()
j.getYearCache(lastSync)
if int(from.Month()) > 6 {
CurrentCacheYearHalf = YearSecondHalf
}
j.getYearHalfCache(lastSync)
j.getUpdateCache(lastSync)
} else {
j.getCache(lastSync)
Expand Down Expand Up @@ -3420,14 +3427,15 @@ func (j *DSGit) createCacheFile(cache []CommitCache, path string) error {
return nil
}

func (j *DSGit) createYearCacheFile(cache []CommitCache, path string) error {
nextYearCache := make([]CommitCache, 0)
func (j *DSGit) createYearHalfCacheFile(cache []CommitCache, path string) error {
nextYearHalfCache := make([]CommitCache, 0)
for _, comm := range cache {
comm.FileLocation = path
if comm.CommitDate.Year() == CurrentCacheYear {
commitYearHalf := getDateYearHalf(comm.CommitDate)
if comm.CommitDate.Year() == CurrentCacheYear && commitYearHalf == CurrentCacheYearHalf {
cachedCommits[comm.EntityID] = comm
} else {
nextYearCache = append(nextYearCache, comm)
nextYearHalfCache = append(nextYearHalfCache, comm)
}
}
records := [][]string{
Expand All @@ -3438,7 +3446,7 @@ func (j *DSGit) createYearCacheFile(cache []CommitCache, path string) error {
}

yearSTR := strconv.Itoa(CurrentCacheYear)
cacheFile := fmt.Sprintf(CommitsByYearCacheFile, yearSTR)
cacheFile := fmt.Sprintf(CommitsByYearHalfCacheFile, yearSTR, CurrentCacheYearHalf)
csvFile, err := os.Create(cacheFile)
if err != nil {
return err
Expand All @@ -3453,7 +3461,6 @@ func (j *DSGit) createYearCacheFile(cache []CommitCache, path string) error {
if err != nil {
return err
}
cachedCommits = make(map[string]CommitCache)
err = j.cacheProvider.UpdateMultiPartFileByKey(j.endpoint, cacheFile)
if err != nil {
return err
Expand All @@ -3463,18 +3470,40 @@ func (j *DSGit) createYearCacheFile(cache []CommitCache, path string) error {
if err != nil {
return err
}
loadCacheToMemory(records)
if len(nextYearCache) > 0 {
CurrentCacheYear = nextYearCache[0].CommitDate.Year()
if err = j.createYearCacheFile(nextYearCache, path); err != nil {
if len(nextYearHalfCache) > 0 {
//CurrentCacheYear = nextYearHalfCache[0].CommitDate.Year()
updateYearHalf(nextYearHalfCache[0].CommitDate)
if err = j.createYearHalfCacheFile(nextYearHalfCache, path); err != nil {
return err
}
cachedCommits = make(map[string]CommitCache)
j.getYearCache(os.Getenv("LAST_SYNC"))
j.getYearHalfCache(os.Getenv("LAST_SYNC"))
}
return nil
}

func getDateYearHalf(commitDate time.Time) string {
monthNumber := int(commitDate.Month())
if monthNumber > 6 {
return YearSecondHalf
}
return YearFirstHalf
}

func updateYearHalf(commitDate time.Time) {
cuHalf := getDateYearHalf(commitDate)
if cuHalf == CurrentCacheYearHalf {
return
}

if CurrentCacheYearHalf == YearFirstHalf {
CurrentCacheYearHalf = YearSecondHalf
return
}
CurrentCacheYearHalf = YearFirstHalf
CurrentCacheYear += 1
}

func (j *DSGit) createUpdateCacheFile(cache []CommitCache, path string) error {
for _, comm := range cache {
comm.FileLocation = path
Expand Down Expand Up @@ -3631,9 +3660,9 @@ func (j *DSGit) getUpdateCache(lastSync string) {
}
}

func (j *DSGit) getYearCache(lastSync string) {
func (j *DSGit) getYearHalfCache(lastSync string) {
yearSTR := strconv.Itoa(CurrentCacheYear)
commentBytes, err := j.cacheProvider.GetFileByKey(j.endpoint, fmt.Sprintf(CommitsByYearCacheFile, yearSTR))
commentBytes, err := j.cacheProvider.GetFileByKey(j.endpoint, fmt.Sprintf(CommitsByYearHalfCacheFile, yearSTR, CurrentCacheYearHalf))
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/LF-Engineering/insights-datasource-git
go 1.17

require (
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230410030513-945f1d5a92a4
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230411073313-68b5e7a0b0ef
github.com/LF-Engineering/lfx-event-schema v0.1.37
github.com/aws/aws-lambda-go v1.27.1
github.com/aws/aws-sdk-go v1.42.25
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230410030513-945f1d5a92a4 h1:q9DZVrh19QzM0s6EEbAHFkf51Ubw4jw5g1qlKASlHvc=
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230410030513-945f1d5a92a4/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8=
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230411073313-68b5e7a0b0ef h1:Mwv6SkvJgLQi2/jdJCSWbjG/CFolOiQtRb3Ydhb4Oe8=
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230411073313-68b5e7a0b0ef/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8=
github.com/LF-Engineering/lfx-event-schema v0.1.14/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=
github.com/LF-Engineering/lfx-event-schema v0.1.37 h1:ny46D2NdCXokvJZ01GJcw2RfQM64ousJjaYsrRj5zzg=
github.com/LF-Engineering/lfx-event-schema v0.1.37/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=
Expand Down

0 comments on commit be8da62

Please sign in to comment.