From 130bd62548b7cf686936445894103bc25662cf7a Mon Sep 17 00:00:00 2001 From: AkashRajpurohit Date: Wed, 4 Dec 2024 18:25:44 +0530 Subject: [PATCH] feat: :sparkles: add concurrency support --- cmd/root.go | 19 ++++-- pkg/bitbucket/bitbucket.go | 29 +++------ pkg/config/config.go | 3 + pkg/config/validate.go | 5 ++ pkg/config/validate_test.go | 114 ++++++++++++++++++++++++++---------- pkg/forgejo/forgejo.go | 28 +++------ pkg/github/github.go | 28 +++------ pkg/gitlab/gitlab.go | 28 +++------ pkg/raw/raw.go | 25 ++------ pkg/sync/concurrency.go | 50 ++++++++++++++++ 10 files changed, 194 insertions(+), 135 deletions(-) create mode 100644 pkg/sync/concurrency.go diff --git a/cmd/root.go b/cmd/root.go index e13058f..0bc63d4 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -44,6 +44,20 @@ var rootCmd = &cobra.Command{ } } + // Backward Compatibility Starts + // TODO: Remove these before v1.0.0 release + // If concurrency is not set, set it to 5 + if cfg.Concurrency == 0 { + cfg.Concurrency = 5 + } + + // If no clone_type is not set in the config file, set it to bare + if cfg.CloneType == "" { + cfg.CloneType = "bare" + } + + // Backward Compatibility Ends + // If backupDir option is passed in the command line, use that instead of the one in the config file if backupDir != "" { cfg.BackupDir = config.GetBackupDir(backupDir) @@ -54,11 +68,6 @@ var rootCmd = &cobra.Command{ cfg.Cron = cron } - // If no clone_type is not set in the config file, set it to bare - if cfg.CloneType == "" { - cfg.CloneType = "bare" - } - logger.Info("Config loaded from: ", config.GetConfigFile(cfgFile)) logger.Info("Validating config ⏳") diff --git a/pkg/bitbucket/bitbucket.go b/pkg/bitbucket/bitbucket.go index b79c92d..582a818 100644 --- a/pkg/bitbucket/bitbucket.go +++ b/pkg/bitbucket/bitbucket.go @@ -1,8 +1,6 @@ package bitbucket import ( - "sync" - "github.com/AkashRajpurohit/git-sync/pkg/config" "github.com/AkashRajpurohit/git-sync/pkg/helpers" "github.com/AkashRajpurohit/git-sync/pkg/logger" @@ -29,27 +27,16 @@ func (c BitbucketClient) Sync(cfg config.Config) error { return err } - logger.Info("Total repositories: ", len(repos)) - - var wg sync.WaitGroup - sem := make(chan struct{}, 10) // Concurrency of 10 - - for _, repo := range repos { - wg.Add(1) - go func(repo *bb.Repository) { - defer wg.Done() - sem <- struct{}{} - gitSync.CloneOrUpdateRepo(cfg.Workspace, repo.Name, cfg) - if cfg.IncludeWiki && repo.Has_wiki { - gitSync.SyncWiki(cfg.Workspace, repo.Name, cfg) - } - <-sem - }(repo) - } + gitSync.LogRepoCount(len(repos), "Bitbucket") - wg.Wait() - logger.Info("All repositories synced ✅") + gitSync.SyncReposWithConcurrency(cfg, repos, func(repo *bb.Repository) { + gitSync.CloneOrUpdateRepo(cfg.Workspace, repo.Name, cfg) + if cfg.IncludeWiki && repo.Has_wiki { + gitSync.SyncWiki(cfg.Workspace, repo.Name, cfg) + } + }) + gitSync.LogSyncComplete("Bitbucket") return nil } diff --git a/pkg/config/config.go b/pkg/config/config.go index e99370b..526a5ff 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -30,6 +30,7 @@ type Config struct { Cron string `mapstructure:"cron"` CloneType string `mapstructure:"clone_type"` RawGitURLs []string `mapstructure:"raw_git_urls"` + Concurrency int `mapstructure:"concurrency"` } func expandPath(path string) string { @@ -109,6 +110,7 @@ func SaveConfig(config Config, cfgFile string) error { viper.Set("cron", config.Cron) viper.Set("clone_type", config.CloneType) viper.Set("raw_git_urls", config.RawGitURLs) + viper.Set("concurrency", config.Concurrency) return viper.WriteConfig() } @@ -133,5 +135,6 @@ func GetInitialConfig() Config { BackupDir: GetBackupDir(""), CloneType: "bare", RawGitURLs: []string{}, + Concurrency: 5, } } diff --git a/pkg/config/validate.go b/pkg/config/validate.go index b94769d..64f0b71 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -47,6 +47,11 @@ func ValidateConfig(cfg Config) error { return fmt.Errorf("clone_type can only be `bare`, `full`, `mirror` or `shallow`") } + // Validate concurrency + if cfg.Concurrency < 1 || cfg.Concurrency > 20 { + return fmt.Errorf("concurrency must be between 1 and 20") + } + // Validate cron if provided if cfg.Cron != "" { _, err := cron.ParseStandard(cfg.Cron) diff --git a/pkg/config/validate_test.go b/pkg/config/validate_test.go index 2f81823..0701c42 100644 --- a/pkg/config/validate_test.go +++ b/pkg/config/validate_test.go @@ -57,6 +57,51 @@ func TestValidateConfig(t *testing.T) { cfg Config wantErr bool }{ + { + name: "Invalid Concurrency - Zero", + cfg: Config{ + BackupDir: "test", + CloneType: "bare", + Concurrency: 0, + Server: Server{ + Domain: "test", + Protocol: "https", + }, + Username: "test", + Token: "test", + }, + wantErr: true, + }, + { + name: "Invalid Concurrency - Negative", + cfg: Config{ + BackupDir: "test", + CloneType: "bare", + Concurrency: -1, + Server: Server{ + Domain: "test", + Protocol: "https", + }, + Username: "test", + Token: "test", + }, + wantErr: true, + }, + { + name: "Valid Concurrency - Custom", + cfg: Config{ + BackupDir: "test", + CloneType: "bare", + Concurrency: 10, + Server: Server{ + Domain: "test", + Protocol: "https", + }, + Username: "test", + Token: "test", + }, + wantErr: false, + }, { name: "Empty BackupDir", cfg: Config{ @@ -84,8 +129,9 @@ func TestValidateConfig(t *testing.T) { { name: "Valid Raw Git URLs Only", cfg: Config{ - BackupDir: "test", - CloneType: "bare", + BackupDir: "test", + CloneType: "bare", + Concurrency: 5, RawGitURLs: []string{ "https://github.com/user/repo1.git", "git@github.com:user/repo2.git", @@ -96,8 +142,9 @@ func TestValidateConfig(t *testing.T) { { name: "Invalid Raw Git URL", cfg: Config{ - BackupDir: "test", - CloneType: "bare", + BackupDir: "test", + CloneType: "bare", + Concurrency: 5, RawGitURLs: []string{ "https://github.com/user/repo1", // Missing .git "git@github.com:user/repo2.git", @@ -108,9 +155,10 @@ func TestValidateConfig(t *testing.T) { { name: "Empty Username with No Raw URLs", cfg: Config{ - BackupDir: "test", - CloneType: "bare", - Token: "test", + BackupDir: "test", + CloneType: "bare", + Concurrency: 5, + Token: "test", Server: Server{ Domain: "test", Protocol: "https", @@ -121,9 +169,10 @@ func TestValidateConfig(t *testing.T) { { name: "Empty Token with No Raw URLs", cfg: Config{ - Username: "test", - BackupDir: "test", - CloneType: "bare", + Username: "test", + BackupDir: "test", + CloneType: "bare", + Concurrency: 5, Server: Server{ Domain: "test", Protocol: "https", @@ -134,10 +183,11 @@ func TestValidateConfig(t *testing.T) { { name: "Empty Server Domain with No Raw URLs", cfg: Config{ - Username: "test", - Token: "test", - BackupDir: "test", - CloneType: "bare", + Username: "test", + Token: "test", + BackupDir: "test", + CloneType: "bare", + Concurrency: 5, Server: Server{ Protocol: "https", }, @@ -147,10 +197,11 @@ func TestValidateConfig(t *testing.T) { { name: "Invalid Server Protocol with No Raw URLs", cfg: Config{ - Username: "test", - Token: "test", - BackupDir: "test", - CloneType: "bare", + Username: "test", + Token: "test", + BackupDir: "test", + CloneType: "bare", + Concurrency: 5, Server: Server{ Domain: "test", Protocol: "ftp", @@ -161,10 +212,11 @@ func TestValidateConfig(t *testing.T) { { name: "Empty Workspace for Bitbucket with No Raw URLs", cfg: Config{ - Username: "test", - Token: "test", - BackupDir: "test", - CloneType: "bare", + Username: "test", + Token: "test", + BackupDir: "test", + CloneType: "bare", + Concurrency: 5, Server: Server{ Domain: "test", Protocol: "https", @@ -177,10 +229,11 @@ func TestValidateConfig(t *testing.T) { { name: "Valid Platform Config with No Raw URLs", cfg: Config{ - Username: "test", - Token: "test", - BackupDir: "test", - CloneType: "bare", + Username: "test", + Token: "test", + BackupDir: "test", + CloneType: "bare", + Concurrency: 5, Server: Server{ Domain: "test", Protocol: "https", @@ -191,10 +244,11 @@ func TestValidateConfig(t *testing.T) { { name: "Valid Mixed Config (Platform + Raw URLs)", cfg: Config{ - Username: "test", - Token: "test", - BackupDir: "test", - CloneType: "bare", + Username: "test", + Token: "test", + BackupDir: "test", + CloneType: "bare", + Concurrency: 5, Server: Server{ Domain: "test", Protocol: "https", diff --git a/pkg/forgejo/forgejo.go b/pkg/forgejo/forgejo.go index 9306b4b..1fb7bcc 100644 --- a/pkg/forgejo/forgejo.go +++ b/pkg/forgejo/forgejo.go @@ -2,7 +2,6 @@ package forgejo import ( "fmt" - "sync" fg "codeberg.org/mvdkleijn/forgejo-sdk/forgejo" "github.com/AkashRajpurohit/git-sync/pkg/config" @@ -39,27 +38,16 @@ func (c ForgejoClient) Sync(cfg config.Config) error { return err } - logger.Info("Total repositories: ", len(repos)) + gitSync.LogRepoCount(len(repos), "Forgejo") - var wg sync.WaitGroup - sem := make(chan struct{}, 10) // Concurrency of 10 - - for _, repo := range repos { - wg.Add(1) - go func(repo *fg.Repository) { - defer wg.Done() - sem <- struct{}{} - gitSync.CloneOrUpdateRepo(repo.Owner.UserName, repo.Name, cfg) - if cfg.IncludeWiki && repo.HasWiki { - gitSync.SyncWiki(repo.Owner.UserName, repo.Name, cfg) - } - <-sem - }(repo) - } - - wg.Wait() - logger.Info("All repositories synced ✅") + gitSync.SyncReposWithConcurrency(cfg, repos, func(repo *fg.Repository) { + gitSync.CloneOrUpdateRepo(repo.Owner.UserName, repo.Name, cfg) + if cfg.IncludeWiki && repo.HasWiki { + gitSync.SyncWiki(repo.Owner.UserName, repo.Name, cfg) + } + }) + gitSync.LogSyncComplete("Forgejo") return nil } diff --git a/pkg/github/github.go b/pkg/github/github.go index d80d665..9d0b51f 100644 --- a/pkg/github/github.go +++ b/pkg/github/github.go @@ -2,7 +2,6 @@ package github import ( "context" - "sync" "github.com/AkashRajpurohit/git-sync/pkg/config" "github.com/AkashRajpurohit/git-sync/pkg/helpers" @@ -38,27 +37,16 @@ func (c GitHubClient) Sync(cfg config.Config) error { return err } - logger.Info("Total repositories: ", len(repos)) + gitSync.LogRepoCount(len(repos), "GitHub") - var wg sync.WaitGroup - sem := make(chan struct{}, 10) // Concurrency of 10 - - for _, repo := range repos { - wg.Add(1) - go func(repo *gh.Repository) { - defer wg.Done() - sem <- struct{}{} - gitSync.CloneOrUpdateRepo(repo.GetOwner().GetLogin(), repo.GetName(), cfg) - if cfg.IncludeWiki && repo.GetHasWiki() { - gitSync.SyncWiki(repo.GetOwner().GetLogin(), repo.GetName(), cfg) - } - <-sem - }(repo) - } - - wg.Wait() - logger.Info("All repositories synced ✅") + gitSync.SyncReposWithConcurrency(cfg, repos, func(repo *gh.Repository) { + gitSync.CloneOrUpdateRepo(repo.GetOwner().GetLogin(), repo.GetName(), cfg) + if cfg.IncludeWiki && repo.GetHasWiki() { + gitSync.SyncWiki(repo.GetOwner().GetLogin(), repo.GetName(), cfg) + } + }) + gitSync.LogSyncComplete("GitHub") return nil } diff --git a/pkg/gitlab/gitlab.go b/pkg/gitlab/gitlab.go index 6ee7797..0c5eefd 100644 --- a/pkg/gitlab/gitlab.go +++ b/pkg/gitlab/gitlab.go @@ -2,7 +2,6 @@ package gitlab import ( "fmt" - "sync" "github.com/AkashRajpurohit/git-sync/pkg/config" "github.com/AkashRajpurohit/git-sync/pkg/helpers" @@ -33,27 +32,16 @@ func (c GitlabClient) Sync(cfg config.Config) error { return err } - logger.Info("Total projects: ", len(projects)) + gitSync.LogRepoCount(len(projects), "GitLab") - var wg sync.WaitGroup - sem := make(chan struct{}, 10) // Concurrency of 10 - - for _, project := range projects { - wg.Add(1) - go func(project *gl.Project) { - defer wg.Done() - sem <- struct{}{} - gitSync.CloneOrUpdateRepo(project.Namespace.FullPath, project.Path, cfg) - if cfg.IncludeWiki && project.WikiEnabled { - gitSync.SyncWiki(project.Namespace.FullPath, project.Path, cfg) - } - <-sem - }(project) - } - - wg.Wait() - logger.Info("All projects synced ✅") + gitSync.SyncReposWithConcurrency(cfg, projects, func(project *gl.Project) { + gitSync.CloneOrUpdateRepo(project.Namespace.FullPath, project.Path, cfg) + if cfg.IncludeWiki && project.WikiEnabled { + gitSync.SyncWiki(project.Namespace.FullPath, project.Path, cfg) + } + }) + gitSync.LogSyncComplete("GitLab") return nil } diff --git a/pkg/raw/raw.go b/pkg/raw/raw.go index 6b47f2e..cd445cb 100644 --- a/pkg/raw/raw.go +++ b/pkg/raw/raw.go @@ -3,10 +3,8 @@ package raw import ( "path/filepath" "strings" - "sync" "github.com/AkashRajpurohit/git-sync/pkg/config" - "github.com/AkashRajpurohit/git-sync/pkg/logger" gitSync "github.com/AkashRajpurohit/git-sync/pkg/sync" ) @@ -35,24 +33,13 @@ func (c RawClient) Sync(cfg config.Config) error { return nil } - logger.Info("Total raw repositories: ", len(cfg.RawGitURLs)) + gitSync.LogRepoCount(len(cfg.RawGitURLs), "raw") - var wg sync.WaitGroup - sem := make(chan struct{}, 10) // Concurrency of 10 - - for _, url := range cfg.RawGitURLs { - wg.Add(1) - go func(repoURL string) { - defer wg.Done() - sem <- struct{}{} - owner, name := c.extractRepoInfo(repoURL) - gitSync.CloneOrUpdateRawRepo(owner, name, repoURL, cfg) - <-sem - }(url) - } - - wg.Wait() - logger.Info("All raw repositories synced ✅") + gitSync.SyncWithConcurrency(cfg, cfg.RawGitURLs, func(url string) { + owner, name := c.extractRepoInfo(url) + gitSync.CloneOrUpdateRawRepo(owner, name, url, cfg) + }) + gitSync.LogSyncComplete("raw") return nil } diff --git a/pkg/sync/concurrency.go b/pkg/sync/concurrency.go new file mode 100644 index 0000000..09ac3f5 --- /dev/null +++ b/pkg/sync/concurrency.go @@ -0,0 +1,50 @@ +package sync + +import ( + "sync" + + "github.com/AkashRajpurohit/git-sync/pkg/config" + "github.com/AkashRajpurohit/git-sync/pkg/logger" +) + +func SyncWithConcurrency(cfg config.Config, repos []string, syncFn func(string)) { + var wg sync.WaitGroup + sem := make(chan struct{}, cfg.Concurrency) + + for _, repo := range repos { + wg.Add(1) + go func(r string) { + defer wg.Done() + sem <- struct{}{} + syncFn(r) + <-sem + }(repo) + } + + wg.Wait() +} + +func SyncReposWithConcurrency[T any](cfg config.Config, repos []T, syncFn func(T)) { + var wg sync.WaitGroup + sem := make(chan struct{}, cfg.Concurrency) + + for _, repo := range repos { + wg.Add(1) + go func(r T) { + defer wg.Done() + sem <- struct{}{} + syncFn(r) + <-sem + }(repo) + } + + wg.Wait() +} + +func LogRepoCount(count int, repoType string) { + logger.Info("Total ", repoType, " repositories: ", count) +} + +func LogSyncComplete(repoType string) { + logger.Info("All ", repoType, " repositories synced ✅") +}