diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index bd370974..639892e4 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -4,12 +4,10 @@ on: pull_request: branches: - master - - dev push: branches: - master - - dev jobs: build: @@ -17,15 +15,10 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - clickhouse: - - '1.1.54390' - - '19.11.12.69' - - '19.15.3.6' - - '19.16.19.85' - - '20.3' - - '20.8' - - '21.3' - - '21.8' + golang-version: + - "1.17" + outputs: + GCS_TESTS: ${{ steps.secrets.outputs.GST_TESTS }} steps: - name: Checkout project uses: actions/checkout@v2 @@ -34,8 +27,7 @@ jobs: id: setup-go uses: actions/setup-go@v2 with: - go-version: '^1.17' - + go-version: '^${{ matrix.golang-version }}' - name: Cache golang id: cache-golang @@ -53,27 +45,18 @@ jobs: if: | steps.cache-golang.outputs.cache-hit != 'true' - - name: Setup docker-compose - run: | - sudo apt-get update - sudo apt-get install --no-install-recommends -y make python3-pip - sudo python3 -m pip install -U pip - sudo pip3 install --prefer-binary -U setuptools - sudo pip3 install --prefer-binary -U docker-compose - - name: Extract DOCKER_TAG version - id: docker_tag + - name: Build clickhouse-backup binary + id: make + env: + GOROOT: ${{ env.GOROOT_1_17_X64 }} run: | - DOCKER_TAG=${GITHUB_REF##*/} - export DOCKER_TAG=${DOCKER_TAG##*\\} - echo "::set-output name=docker_tag::${DOCKER_TAG:-dev}" + make build-race + make config + make test - - run: make build - - run: make config - - run: make test - - # be carefull with encrypt with old OpenSSL - https://habr.com/ru/post/535140/ + # be careful with encrypt with old OpenSSL - https://habr.com/ru/post/535140/ - name: Decrypting credentials for Google Cloud Storage id: secrets env: @@ -85,31 +68,115 @@ jobs: fi echo "::set-output name=GCS_TESTS::$(if [ -z "${{ secrets.VAULT_PASSWORD }}" ]; then echo "false"; else echo "true"; fi)" + - uses: actions/upload-artifact@v2 + with: + name: build-gcp-credentials + path: | + test/integration/credentials.json + if-no-files-found: error + if: | + steps.secrets.outputs.GCS_TESTS == 'true' + + - uses: actions/upload-artifact@v2 + with: + name: build-artifacts + path: | + ./clickhouse-backup/clickhouse-backup + ./clickhouse-backup/clickhouse-backup-race + ChangeLog.md + if-no-files-found: error + test: + needs: build + name: Test + runs-on: ubuntu-latest + strategy: + matrix: + golang-version: + - "1.17" + clickhouse: + - '1.1.54390' + - '19.17' + - '20.3' + - '20.8' + - '21.3' + - '21.8' + steps: + - name: Checkout project + uses: actions/checkout@v2 + + - name: Setup golang + id: setup-go + uses: actions/setup-go@v2 + with: + go-version: '^${{ matrix.golang-version }}' + + - name: Cache golang + id: cache-golang + uses: actions/cache@v2 + with: + path: | + ~/go/pkg/mod + ~/.cache/go-build + key: ${{ runner.os }}-${{ matrix.golang-version }}-golang-${{ hashFiles('go.sum') }} + restore-keys: | + ${{ runner.os }}-${{ matrix.golang-version }}-golang- + + - uses: actions/download-artifact@v2 + with: + name: build-artifacts + + - uses: actions/download-artifact@v2 + with: + name: build-gcp-credentials + if: | + needs.build.outputs.GCS_TESTS == 'true' + - name: Running integration tests env: CLICKHOUSE_VERSION: ${{ matrix.clickhouse }} - # LOG_LEVEL: debug + # RUN_TESTS: "TestIntegrationFTP" + # LOG_LEVEL: "debug" + # FTP_DEBUG: "true" CGO_ENABLED: 0 - GCS_TESTS: ${{ steps.secrets.outputs.GCS_TESTS }} + GCS_TESTS: ${{ needs.build.outputs.GCS_TESTS }} run: | set -x echo "CLICKHOUSE_VERSION=${CLICKHOUSE_VERSION}" echo "GCS_TESTS=${GCS_TESTS}" + + chmod +x $(pwd)/clickhouse-backup/clickhouse-backup* + if [[ "${CLICKHOUSE_VERSION}" == 2* ]]; then export COMPOSE_FILE=docker-compose_advanced.yml else export COMPOSE_FILE=docker-compose.yml fi - export CLICKHOUSE_BACKUP_BIN="$(pwd)/clickhouse-backup/clickhouse-backup" - docker-compose -f test/integration/${COMPOSE_FILE} down - docker volume prune -f - docker-compose -f test/integration/${COMPOSE_FILE} up -d --force-recreate + export CLICKHOUSE_BACKUP_BIN="$(pwd)/clickhouse-backup/clickhouse-backup-race" + docker-compose -f test/integration/${COMPOSE_FILE} up -d clickhouse docker-compose -f test/integration/${COMPOSE_FILE} ps -a - go test -failfast -tags=integration -v test/integration/integration_test.go + go test -timeout 30m -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v test/integration/integration_test.go + + docker: + needs: test + name: Docker + runs-on: ubuntu-latest + steps: + - name: Checkout project + uses: actions/checkout@v2 + + - uses: actions/download-artifact@v2 + with: + name: build-artifacts + + - name: Extract DOCKER_TAG version + id: docker_tag + run: | + DOCKER_TAG=${GITHUB_REF##*/} + export DOCKER_TAG=${DOCKER_TAG##*\\} + echo "::set-output name=docker_tag::${DOCKER_TAG:-dev}" - name: Building docker image env: - CLICKHOUSE_VERSION: ${{ matrix.clickhouse }} DOCKER_REPO: ${{ secrets.DOCKER_REPO }} DOCKER_IMAGE: ${{ secrets.DOCKER_IMAGE }} DOCKER_TOKEN: ${{ secrets.DOCKER_TOKEN }} @@ -117,7 +184,7 @@ jobs: DOCKER_REGISTRY: ${{ secrets.DOCKER_REGISTRY }} DOCKER_TAG: ${{ steps.docker_tag.outputs.docker_tag }} run: | - if [[ "${CLICKHOUSE_VERSION}" == "21.3" && "${DOCKER_TOKEN}" != "" ]]; then + if [[ "${DOCKER_TOKEN}" != "" ]]; then export DOCKER_REGISTRY=${DOCKER_REGISTRY:-docker.io} echo ${DOCKER_TOKEN} | docker login -u ${DOCKER_USER} --password-stdin ${DOCKER_REGISTRY} diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 842c225c..2e04d275 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -9,6 +9,11 @@ jobs: release: name: Release runs-on: ubuntu-latest + strategy: + matrix: + golang-version: + - "1.17" + steps: - name: Checkout project uses: actions/checkout@v2 @@ -17,11 +22,10 @@ jobs: id: setup-go uses: actions/setup-go@v2 with: - go-version: '^1.17' + go-version: '^${{ matrix.golang-version }}' - name: Setup fpm and make run: | - sudo apt-get update sudo apt-get install -y --no-install-recommends ruby ruby-dev gcc g++ rpm sudo apt-get install --no-install-recommends -y make sudo gem install --no-document fpm diff --git a/ChangeLog.md b/ChangeLog.md index 8afbe3f4..1b8f2a42 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,3 +1,34 @@ +# v1.2.0 + +INCOMPATIBLE CHANGES +- REST API `/backup/status` now return only latest executed command with status and error message + +IMPROVEMENTS +- Added REST API `/backup/list/local` and `/backup/list/remote` to allow list backup types separately +- Decreased background backup creation time via REST API `/backup/create`, during avoid list remote backups for update metrics value +- Decreased backup creation time, during avoid scan whole `system.tables` when set `table` query string parameter or `--tables` cli parameter +- Added `last` and `filter` query string parameters to REST API `/backup/actions`, to avoid pass to client long JSON documents +- Improved `FTP` remote storage parallel upload / download +- Added `FTP_CONCURRENCY` to allow, by default MAX_CPU / 2 +- Added `FTP_DEBUG` setting, to allow debug FTP commands +- Added `FTP` to CI/CD on any commit +- Added race condition check to CI/CD + +BUG FIXES +- environment variable `LOG_LEVEL` now apply to `clickhouse-backup server` properly +- fix [#280](https://github.com/AlexAkulov/clickhouse-backup/issues/280), incorrect prometheus metrics measurement for `/backup/create`, `/backup/upload`, `/backup/download` +- fix [#273](https://github.com/AlexAkulov/clickhouse-backup/issues/273), return `S3_PART_SIZE` back, but calculates it smartly +- fix [#252](https://github.com/AlexAkulov/clickhouse-backup/issues/252), now you can pass `last` and `filter` query string parameters +- fix [#246](https://github.com/AlexAkulov/clickhouse-backup/issues/246), incorrect error messages when use `REMOTE_STORAGE=none` +- fix [#283](https://github.com/AlexAkulov/clickhouse-backup/issues/283), properly handle error message from `FTP` server +- fix [#268](https://github.com/AlexAkulov/clickhouse-backup/issues/268), properly restore legacy backup for schema without database name + +# v1.1.1 + +BUG FIXES +- fix broken `system.backup_list` integration table after add `required field` in https://github.com/AlexAkulov/clickhouse-backup/pull/263 +- fix [#274](https://github.com/AlexAkulov/clickhouse-backup/issues/274) invalid `SFTP_PASSWORD` environment usage + # v1.1.0 IMPROVEMENTS @@ -12,7 +43,6 @@ IMPROVEMENTS - Added options for RBAC and CONFIGs backup, look to `clickhouse-backup help create` and `clickhouse-backup help restore` for details - Add `S3_CONCURRENCY` option to speedup backup upload to `S3` - Add `SFTP_CONCURRENCY` option to speedup backup upload to `SFTP` -- Add `--diff-from-remote` to `upload` command for avoid store local backup - Add `AZBLOB_USE_MANAGED_IDENTITY` support for ManagedIdentity for azure remote storage, thanks https://github.com/roman-vynar - Add clickhouse-operator kubernetes manifest which run `clickhouse-backup` in `server` mode on each clickhouse pod in kubernetes cluster - Add detailed description and restrictions for incremental backups. diff --git a/Makefile b/Makefile index 7f29a692..c4dee5a3 100644 --- a/Makefile +++ b/Makefile @@ -39,6 +39,7 @@ test: build: $(NAME)/$(NAME) + config: $(NAME)/config.yml $(NAME)/config.yml: $(NAME)/$(NAME) @@ -80,3 +81,8 @@ $(PKG_FILES): build/pkg -v $(VERSION) \ -p build \ build/pkg/=/ + +build-race: $(NAME)/$(NAME)-race + +$(NAME)/$(NAME)-race: $(GO_FILES) + CGO_ENABLED=1 $(GO_BUILD) -race -o $@ ./cmd/$(NAME) diff --git a/ReadMe.md b/ReadMe.md index 6b1226ad..c14fab51 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -220,8 +220,8 @@ Create new backup: `curl -s localhost:7171/backup/create -X POST | jq .` * Optional query argument `table` works the same as the `--table value` CLI argument. * Optional query argument `name` works the same as specifying a backup name with the CLI. * Optional query argument `schema` works the same the `--schema` CLI argument (backup schema only). -* Optional query argument `rbac` works the same the `--rbac` CLI argument (backup RBAC only). -* Optional query argument `configs` works the same the `--configs` CLI argument (backup configs only). +* Optional query argument `rbac` works the same the `--rbac` CLI argument (backup RBAC). +* Optional query argument `configs` works the same the `--configs` CLI argument (backup configs). * Full example: `curl -s 'localhost:7171/backup/create?table=default.billing&name=billing_test' -X POST` Note: this operation is async, so the API will return once the operation has been started. @@ -233,9 +233,11 @@ Upload backup to remote storage: `curl -s localhost:7171/backup/upload/ **GET /backup/list** +> **GET /backup/list/{where}** Print list of backups: `curl -s localhost:7171/backup/list | jq .` +Print list only local backups: `curl -s localhost:7171/backup/list/local | jq .` +Print list only remote backups: `curl -s localhost:7171/backup/list/remote | jq .` Note: The `Size` field is not populated for local backups. @@ -251,8 +253,8 @@ Create schema and restore data from backup: `curl -s localhost:7171/backup/resto * Optional query argument `table` works the same as the `--table value` CLI argument. * Optional query argument `schema` works the same the `--schema` CLI argument (restore schema only). * Optional query argument `data` works the same the `--data` CLI argument (restore data only). -* Optional query argument `rbac` works the same the `--rbac` CLI argument (restore RBAC only). -* Optional query argument `configs` works the same the `--configs` CLI argument (restore configs only). +* Optional query argument `rbac` works the same the `--rbac` CLI argument (restore RBAC). +* Optional query argument `configs` works the same the `--configs` CLI argument (restore configs). > **POST /backup/delete** @@ -262,7 +264,7 @@ Delete specific local backup: `curl -s localhost:7171/backup/delete/local/ **GET /backup/status** -Display list of current async operations: `curl -s localhost:7171/backup/status | jq .` +Display list of current running async operation: `curl -s localhost:7171/backup/status | jq .` > **POST /backup/actions** @@ -271,6 +273,8 @@ Execute multiple backup actions: `curl -X POST -d '{"command":"create test_backu > **GET /backup/actions** Display list of current async operations: `curl -s localhost:7171/backup/actions | jq .` +* Optional query argument `filter` could filter actions on server side. +* Optional query argument `last` could filter show only last `XX` actions. ## Storages diff --git a/cmd/clickhouse-backup/main.go b/cmd/clickhouse-backup/main.go index 06c35bd7..ed5d8b9f 100644 --- a/cmd/clickhouse-backup/main.go +++ b/cmd/clickhouse-backup/main.go @@ -58,7 +58,7 @@ func main() { Usage: "Print list of tables", UsageText: "clickhouse-backup tables", Action: func(c *cli.Context) error { - return backup.PrintTables(*getConfig(c), c.Bool("a")) + return backup.PrintTables(getConfig(c), c.Bool("a")) }, Flags: append(cliapp.Flags, cli.BoolFlag{ @@ -329,7 +329,6 @@ func getConfig(ctx *cli.Context) *config.Config { if err != nil { log.Fatal(err.Error()) } - log.SetLevelFromString(cfg.General.LogLevel) return cfg } diff --git a/config/config.go b/config/config.go index df484565..5874624c 100644 --- a/config/config.go +++ b/config/config.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/apex/log" "github.com/aws/aws-sdk-go/service/s3" "github.com/kelseyhightower/envconfig" yaml "gopkg.in/yaml.v2" @@ -110,6 +111,8 @@ type FTPConfig struct { Path string `yaml:"path" envconfig:"FTP_PATH"` CompressionFormat string `yaml:"compression_format" envconfig:"FTP_COMPRESSION_FORMAT"` CompressionLevel int `yaml:"compression_level" envconfig:"FTP_COMPRESSION_LEVEL"` + Concurrency uint8 `yaml:"concurrency" envconfig:"FTP_CONCURRENCY"` + Debug bool `yaml:"debug" envconfig:"FTP_DEBUG"` } // SFTPConfig - sftp settings section @@ -118,7 +121,7 @@ type SFTPConfig struct { Port uint `yaml:"port" envconfig:"SFTP_PORT"` Username string `yaml:"username" envconfig:"SFTP_USERNAME"` Password string `yaml:"password" envconfig:"SFTP_PASSWORD"` - Key string `yaml:"key" envconfig:"SFTP_PASSWORD"` + Key string `yaml:"key" envconfig:"SFTP_KEY"` Path string `yaml:"path" envconfig:"SFTP_PATH"` CompressionFormat string `yaml:"compression_format" envconfig:"SFTP_COMPRESSION_FORMAT"` CompressionLevel int `yaml:"compression_level" envconfig:"SFTP_COMPRESSION_LEVEL"` @@ -209,7 +212,7 @@ func (cfg *Config) GetCompressionFormat() string { } } -// LoadConfig - load config from file +// LoadConfig - load config from file + environment variables func LoadConfig(configLocation string) (*Config, error) { cfg := DefaultConfig() configYaml, err := ioutil.ReadFile(configLocation) @@ -223,6 +226,7 @@ func LoadConfig(configLocation string) (*Config, error) { return nil, err } cfg.AzureBlob.Path = strings.TrimPrefix(cfg.AzureBlob.Path, "/") + log.SetLevelFromString(cfg.General.LogLevel) return cfg, ValidateConfig(cfg) } @@ -230,6 +234,12 @@ func ValidateConfig(cfg *Config) error { if cfg.GetCompressionFormat() == "unknown" { return fmt.Errorf("'%s' is unknown remote storage", cfg.General.RemoteStorage) } + if cfg.General.RemoteStorage == "ftp" && (cfg.FTP.Concurrency < cfg.General.DownloadConcurrency || cfg.FTP.Concurrency < cfg.General.UploadConcurrency) { + return fmt.Errorf( + "FTP_CONCURRENCY=%d should be great or equal than DOWNLOAD_CONCURRENCY=%d and UPLOAD_CONCURRENCY=%d", + cfg.FTP.Concurrency, cfg.General.DownloadConcurrency, cfg.General.UploadConcurrency, + ) + } if cfg.GetCompressionFormat() == "lz4" { return fmt.Errorf("clickhouse already compressed data by lz4") } @@ -344,6 +354,7 @@ func DefaultConfig() *Config { }, FTP: FTPConfig{ Timeout: "2m", + Concurrency: availableConcurrency, CompressionFormat: "tar", CompressionLevel: 1, }, diff --git a/go.mod b/go.mod index 4b452f54..153dc0b5 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/jlaffaye/ftp v0.0.0-20210307004419-5d4190119067 github.com/jmoiron/sqlx v1.3.4 + github.com/jolestar/go-commons-pool/v2 v2.1.1 github.com/kelseyhightower/envconfig v1.4.0 github.com/mattn/go-shellwords v1.0.12 github.com/mholt/archiver v1.1.3-0.20190812163345-2d1449806793 diff --git a/go.sum b/go.sum index ffd2152a..d118adf3 100644 --- a/go.sum +++ b/go.sum @@ -135,6 +135,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -256,6 +258,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/jmoiron/sqlx v1.3.4 h1:wv+0IJZfL5z0uZoUjlpKgHkgaFSYD+r9CfrXjEXsO7w= github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= +github.com/jolestar/go-commons-pool/v2 v2.1.1 h1:KrbCEvx5KhwcHzLTWIE8SJJQL7zzNto5in+wnO9/gSA= +github.com/jolestar/go-commons-pool/v2 v2.1.1/go.mod h1:kTOzcguO2zUoEd+BySdg7Xhk/YE0HEr2bAHdWDkhMXg= github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index fd8c079c..e8fd4bdf 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -99,12 +99,12 @@ func CreateBackup(cfg *config.Config, backupName, tablePattern string, schemaOnl allDatabases, err := ch.GetDatabases() if err != nil { - return fmt.Errorf("cat't get database engines from clickhouse: %v", err) + return fmt.Errorf("can't get database engines from clickhouse: %v", err) } - allTables, err := ch.GetTables() + allTables, err := ch.GetTables(tablePattern) if err != nil { - return fmt.Errorf("cat't get tables from clickhouse: %v", err) + return fmt.Errorf("can't get tables from clickhouse: %v", err) } tables := filterTablesByPattern(allTables, tablePattern) i := 0 diff --git a/pkg/backup/flashback.go b/pkg/backup/flashback.go index 83db48af..214915a0 100644 --- a/pkg/backup/flashback.go +++ b/pkg/backup/flashback.go @@ -29,7 +29,7 @@ func CopyPartHashes(cfg config.Config, tablePattern string, backupName string) e return fmt.Errorf("can't get data path from clickhouse: %v\nyou can set data_path in config file", err) } - allTables, err := ch.GetTables() + allTables, err := ch.GetTables(tablePattern) if err != nil { return fmt.Errorf("can't get tables from clickhouse: %v", err) } diff --git a/pkg/backup/print.go b/pkg/backup/print.go index facdba25..e90cf4c4 100644 --- a/pkg/backup/print.go +++ b/pkg/backup/print.go @@ -243,8 +243,8 @@ func GetRemoteBackups(cfg *config.Config) ([]new_storage.Backup, error) { return backupList, err } -// getTables - get all tables for use by PrintTables and API -func GetTables(cfg config.Config) ([]clickhouse.Table, error) { +// GetTables - get all tables for use by PrintTables and API +func GetTables(cfg *config.Config) ([]clickhouse.Table, error) { ch := &clickhouse.ClickHouse{ Config: &cfg.ClickHouse, } @@ -254,7 +254,7 @@ func GetTables(cfg config.Config) ([]clickhouse.Table, error) { } defer ch.Close() - allTables, err := ch.GetTables() + allTables, err := ch.GetTables("") if err != nil { return []clickhouse.Table{}, fmt.Errorf("can't get tables: %v", err) } @@ -262,7 +262,7 @@ func GetTables(cfg config.Config) ([]clickhouse.Table, error) { } // PrintTables - print all tables suitable for backup -func PrintTables(cfg config.Config, printAll bool) error { +func PrintTables(cfg *config.Config, printAll bool) error { ch := &clickhouse.ClickHouse{ Config: &cfg.ClickHouse, } diff --git a/pkg/backup/restore.go b/pkg/backup/restore.go index e97f0d76..425eb8c0 100644 --- a/pkg/backup/restore.go +++ b/pkg/backup/restore.go @@ -308,7 +308,7 @@ func RestoreData(cfg *config.Config, ch *clickhouse.ClickHouse, backupName strin return fmt.Errorf("no have found schemas by %s in %s", tablePattern, backupName) } log.Debugf("found %d tables with data in backup", len(tablesForRestore)) - chTables, err := ch.GetTables() + chTables, err := ch.GetTables(tablePattern) if err != nil { return err } diff --git a/pkg/clickhouse/clickhouse.go b/pkg/clickhouse/clickhouse.go index 6c16595d..eaf070bb 100644 --- a/pkg/clickhouse/clickhouse.go +++ b/pkg/clickhouse/clickhouse.go @@ -2,6 +2,7 @@ package clickhouse import ( "database/sql" + "errors" "fmt" "net/url" "os" @@ -151,7 +152,7 @@ func (ch *ClickHouse) Close() { } // GetTables - return slice of all tables suitable for backup, MySQL and PostgreSQL database engine shall be skipped -func (ch *ClickHouse) GetTables() ([]Table, error) { +func (ch *ClickHouse) GetTables(tablePattern string) ([]Table, error) { var err error tables := make([]Table, 0) isUUIDPresent := make([]int, 0) @@ -163,6 +164,10 @@ func (ch *ClickHouse) GetTables() ([]Table, error) { return nil, err } allTablesSQL := "SELECT * FROM system.tables WHERE is_temporary = 0" + if tablePattern != "" { + replacer := strings.NewReplacer(".", "\\.", ",", "|", "*", ".*", "?", ".") + allTablesSQL += fmt.Sprintf(" AND match(concat(database,'.',name),'%s') ", replacer.Replace(tablePattern)) + } if len(skipDatabases) > 0 { allTablesSQL += fmt.Sprintf(" AND database NOT IN ('%s')", strings.Join(skipDatabases, "','")) } @@ -581,6 +586,17 @@ func (ch *ClickHouse) CreateTable(table Table, query string, dropTable bool) err return err } } + if !strings.Contains(query, table.Name) { + return errors.New(fmt.Sprintf("schema query ```%s``` doesn't contains table name `%s`", query, table.Name)) + } + + // fix restore schema for legacy backup, see https://github.com/AlexAkulov/clickhouse-backup/issues/268 + if strings.Contains(query, fmt.Sprintf("`%s`", table.Name)) && !strings.Contains(query, fmt.Sprintf("`%s`.`%s`", table.Database, table.Name)) { + query = strings.Replace(query, fmt.Sprintf("`%s`", table.Name), fmt.Sprintf("`%s`.`%s`", table.Database, table.Name), 1) + } else if strings.Contains(query, table.Name) && !strings.Contains(query, fmt.Sprintf("%s.%s", table.Database, table.Name)) && !strings.Contains(query, table.Database) { + query = strings.Replace(query, fmt.Sprintf("%s", table.Name), fmt.Sprintf("%s.%s", table.Database, table.Name), 1) + } + if _, err := ch.Query(query); err != nil { return err } diff --git a/pkg/new_storage/ftp.go b/pkg/new_storage/ftp.go index 86423b07..a6aa0f0e 100644 --- a/pkg/new_storage/ftp.go +++ b/pkg/new_storage/ftp.go @@ -1,23 +1,28 @@ package new_storage import ( + "context" "crypto/tls" + "fmt" + apexLog "github.com/apex/log" "io" "os" "path" "strings" + "sync" "time" "github.com/AlexAkulov/clickhouse-backup/config" - "github.com/jlaffaye/ftp" + "github.com/jolestar/go-commons-pool/v2" ) type FTP struct { - client *ftp.ServerConn - Config *config.FTPConfig - Debug bool - dirCache map[string]struct{} + clients *pool.ObjectPool + ctx context.Context + Config *config.FTPConfig + dirCache map[string]struct{} + dirCacheMutex sync.RWMutex } func (f *FTP) Connect() error { @@ -29,22 +34,24 @@ func (f *FTP) Connect() error { ftp.DialWithTimeout(timeout), ftp.DialWithDisabledEPSV(true), } - if f.Debug { + if f.Config.Debug { options = append(options, ftp.DialWithDebugOutput(os.Stdout)) } if f.Config.TLS { tlsConfig := tls.Config{} options = append(options, ftp.DialWithTLS(&tlsConfig)) } - c, err := ftp.Dial(f.Config.Address, options...) - if err != nil { - return err - } - if err := c.Login(f.Config.Username, f.Config.Password); err != nil { - return err + f.ctx = context.Background() + f.clients = pool.NewObjectPoolWithDefaultConfig(f.ctx, &ftpPoolFactory{options: options, ftp: f}) + if f.Config.Concurrency > 1 { + f.clients.Config.MaxTotal = int(f.Config.Concurrency) * 2 } - f.client = c + f.clients.Config.MaxIdle = 0 + f.clients.Config.MinIdle = 0 + + f.dirCacheMutex.Lock() f.dirCache = map[string]struct{}{} + f.dirCacheMutex.Unlock() return nil } @@ -52,13 +59,43 @@ func (f *FTP) Kind() string { return "FTP" } +// getConnectionFromPool *ftp.ServerConn is not thread-safe, so we need implements connection pool +func (f *FTP) getConnectionFromPool(where string) (*ftp.ServerConn, error) { + apexLog.Debugf("FTP::getConnectionFromPool(%s) active=%d idle=%d", where, f.clients.GetNumActive(), f.clients.GetNumIdle()) + client, err := f.clients.BorrowObject(f.ctx) + if err != nil { + apexLog.Errorf("can't BorrowObject from FTP Connection Pool: %v", err) + return nil, err + } + return client.(*ftp.ServerConn), nil +} + +func (f *FTP) returnConnectionToPool(where string, client *ftp.ServerConn) { + apexLog.Debugf("FTP::returnConnectionToPool(%s) active=%d idle=%d", where, f.clients.GetNumActive(), f.clients.GetNumIdle()) + if client != nil { + err := f.clients.ReturnObject(f.ctx, client) + if err != nil { + apexLog.Errorf("can't ReturnObject to FTP Connection Pool: %v", err) + } + } +} + func (f *FTP) StatFile(key string) (RemoteFile, error) { // cant list files, so check the dir dir := path.Dir(path.Join(f.Config.Path, key)) - entries, err := f.client.List(dir) + client, err := f.getConnectionFromPool("StatFile") + defer f.returnConnectionToPool("StatFile", client) if err != nil { return nil, err } + entries, err := client.List(dir) + if err != nil { + // proftpd return 550 error if `dir` not exists + if strings.HasPrefix(err.Error(), "550") { + return nil, ErrNotFound + } + return nil, err + } file := path.Base(path.Join(f.Config.Path, key)) for i := range entries { if file == entries[i].Name { @@ -75,17 +112,34 @@ func (f *FTP) StatFile(key string) (RemoteFile, error) { } func (f *FTP) DeleteFile(key string) error { - return f.client.Delete(path.Join(f.Config.Path, key)) + client, err := f.getConnectionFromPool("DeleteFile") + defer f.returnConnectionToPool("DeleteFile", client) + if err != nil { + return err + } + return client.RemoveDirRecur(path.Join(f.Config.Path, key)) } -func (f *FTP) Walk(ftpPath string, recirsive bool, process func(RemoteFile) error) error { +func (f *FTP) Walk(ftpPath string, recursive bool, process func(RemoteFile) error) error { + client, err := f.getConnectionFromPool("Walk") + defer f.returnConnectionToPool("Walk", client) + if err != nil { + return err + } prefix := path.Join(f.Config.Path, ftpPath) - if !recirsive { - entries, err := f.client.List(prefix) + if !recursive { + entries, err := client.List(prefix) if err != nil { + // proftpd return 550 error if prefix not exits + if strings.HasPrefix(err.Error(), "550") { + return nil + } return err } for _, entry := range entries { + if entry.Name == "." || entry.Name == ".." { + continue + } if err := process(&ftpFile{ size: int64(entry.Size), lastModified: entry.Time, @@ -96,7 +150,7 @@ func (f *FTP) Walk(ftpPath string, recirsive bool, process func(RemoteFile) erro } return nil } - walker := f.client.Walk(prefix) + walker := client.Walk(prefix) for walker.Next() { if err := walker.Err(); err != nil { return err @@ -117,13 +171,32 @@ func (f *FTP) Walk(ftpPath string, recirsive bool, process func(RemoteFile) erro } func (f *FTP) GetFileReader(key string) (io.ReadCloser, error) { - return f.client.Retr(path.Join(f.Config.Path, key)) + apexLog.Debugf("FTP::GetFileReader key=%s", key) + client, err := f.getConnectionFromPool("GetFileReader") + if err != nil { + return nil, err + } + resp, err := client.Retr(path.Join(f.Config.Path, key)) + return &FTPFileReader{ + Response: resp, + pool: f, + client: client, + }, err } func (f *FTP) PutFile(key string, r io.ReadCloser) error { + apexLog.Debugf("FTP::PutFile key=%s", key) + client, err := f.getConnectionFromPool("PutFile") + defer f.returnConnectionToPool("PutFile", client) + if err != nil { + return err + } k := path.Join(f.Config.Path, key) - f.MkdirAll(path.Dir(k)) - return f.client.Stor(k, r) + err = f.MkdirAll(path.Dir(k)) + if err != nil { + return err + } + return client.Stor(k, r) } type ftpFile struct { @@ -145,15 +218,74 @@ func (f *ftpFile) Name() string { } func (f *FTP) MkdirAll(key string) error { + client, err := f.getConnectionFromPool(fmt.Sprintf("MkDirAll(%s)", key)) + defer f.returnConnectionToPool(fmt.Sprintf("MkDirAll(%s)", key), client) + if err != nil { + return err + } dirs := strings.Split(key, "/") - f.client.ChangeDir("/") + err = client.ChangeDir("/") + if err != nil { + return err + } + for i := range dirs { d := path.Join(dirs[:i+1]...) + + f.dirCacheMutex.RLock() if _, ok := f.dirCache[d]; ok { + f.dirCacheMutex.RUnlock() continue } - f.client.MakeDir(d) + f.dirCacheMutex.RUnlock() + _ = client.MakeDir(d) + + f.dirCacheMutex.Lock() f.dirCache[d] = struct{}{} + f.dirCacheMutex.Unlock() + } + return nil +} + +type FTPFileReader struct { + *ftp.Response + pool *FTP + client *ftp.ServerConn +} + +func (fr *FTPFileReader) Close() error { + defer fr.pool.returnConnectionToPool("FTPFileReader.Close", fr.client) + return fr.Response.Close() +} + +type ftpPoolFactory struct { + options []ftp.DialOption + ftp *FTP +} + +func (f *ftpPoolFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) { + c, err := ftp.Dial(f.ftp.Config.Address, f.options...) + if err != nil { + return nil, err + } + if err := c.Login(f.ftp.Config.Username, f.ftp.Config.Password); err != nil { + return nil, err } + return pool.NewPooledObject(c), nil +} + +func (f *ftpPoolFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error { + return object.Object.(*ftp.ServerConn).Quit() +} + +func (f *ftpPoolFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool { + return true +} + +func (f *ftpPoolFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error { + return nil +} + +func (f *ftpPoolFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error { return nil } diff --git a/pkg/new_storage/general.go b/pkg/new_storage/general.go index 86bcc34a..b8639515 100644 --- a/pkg/new_storage/general.go +++ b/pkg/new_storage/general.go @@ -67,7 +67,7 @@ func (bd *BackupDestination) RemoveOldBackups(keep int) error { } func (bd *BackupDestination) RemoveBackup(backup Backup) error { - if bd.Kind() == "SFTP" { + if bd.Kind() == "SFTP" || bd.Kind() == "FTP" { return bd.DeleteFile(backup.BackupName) } if backup.Legacy { @@ -450,7 +450,6 @@ func NewBackupDestination(cfg *config.Config) (*BackupDestination, error) { case "ftp": ftpStorage := &FTP{ Config: &cfg.FTP, - Debug: cfg.General.LogLevel == "debug", } return &BackupDestination{ ftpStorage, diff --git a/pkg/server/server.go b/pkg/server/server.go index fa8b6071..fe2ea7c6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -68,6 +68,7 @@ func (status *AsyncStatus) start(command string) { Start: time.Now().Format(APITimeFormat), Status: InProgressText, }) + apexLog.Debugf("api.status.Start -> status.commands[%d] == %v", len(status.commands)-1, status.commands[len(status.commands)-1]) } func (status *AsyncStatus) inProgress() bool { @@ -75,8 +76,10 @@ func (status *AsyncStatus) inProgress() bool { defer status.RUnlock() n := len(status.commands) - 1 if n < 0 { + apexLog.Debugf("api.status.inProgress -> len(status.commands)=%d, inProgress=false", len(status.commands)) return false } + apexLog.Debugf("api.status.inProgress -> status.commands[n].Status == %s, inProgress=%v", status.commands[n].Status, status.commands[n].Status == InProgressText) return status.commands[n].Status == InProgressText } @@ -91,12 +94,44 @@ func (status *AsyncStatus) stop(err error) { } status.commands[n].Status = s status.commands[n].Finish = time.Now().Format(APITimeFormat) + apexLog.Debugf("api.status.stop -> status.commands[%d] == %v", n, status.commands[n]) } -func (status *AsyncStatus) status() []ActionRow { +func (status *AsyncStatus) status(current bool, filter string, last int) []ActionRow { status.RLock() defer status.RUnlock() - return status.commands + if current { + last = 1 + } + commands := &status.commands + l := len(*commands) + if l == 0 { + return status.commands + } + + filteredCommands := make([]ActionRow, 0) + if filter != "" { + for _, command := range *commands { + if strings.Contains(command.Command, filter) || strings.Contains(command.Status, filter) || strings.Contains(command.Error, filter) { + filteredCommands = append(filteredCommands, command) + } + } + if len(filteredCommands) == 0 { + return filteredCommands + } + commands = &filteredCommands + } + + begin, end := 0, 1 + l = len(*commands) + if last > 0 && l > last { + begin = l - last + end = l + } else { + begin = 0 + end = l + } + return (*commands)[begin:end] } var ( @@ -142,8 +177,8 @@ func Server(c *cli.App, configPath string, clickhouseBackupVersion string) error } } api.metrics = setupMetrics() - if err := api.updateSizeOfLastBackup(); err != nil { - apexLog.Error(err.Error()) + if err := api.updateSizeOfLastBackup(false); err != nil { + apexLog.Warn(err.Error()) } apexLog.Infof("Starting API server on %s", api.config.API.ListenAddr) sigterm := make(chan os.Signal, 1) @@ -187,10 +222,19 @@ func (api *APIServer) Restart() error { } api.server = server if api.config.API.Secure { - go api.server.ListenAndServeTLS(api.config.API.CertificateFile, api.config.API.PrivateKeyFile) + go func() { + err = api.server.ListenAndServeTLS(api.config.API.CertificateFile, api.config.API.PrivateKeyFile) + if err != nil { + apexLog.Fatalf("ListenAndServeTLS error: %s", err.Error()) + } + }() return nil } - go api.server.ListenAndServe() + go func() { + if err = api.server.ListenAndServe(); err != nil { + apexLog.Fatalf("ListenAndServe error: %s", err.Error()) + } + }() return nil } @@ -209,6 +253,7 @@ func (api *APIServer) setupAPIServer() *http.Server { r.HandleFunc("/backup/tables", api.httpTablesHandler).Methods("GET") r.HandleFunc("/backup/list", api.httpListHandler).Methods("GET") + r.HandleFunc("/backup/list/{where}", api.httpListHandler).Methods("GET") r.HandleFunc("/backup/create", api.httpCreateHandler).Methods("POST") r.HandleFunc("/backup/upload/{name}", api.httpUploadHandler).Methods("POST") r.HandleFunc("/backup/download/{name}", api.httpDownloadHandler).Methods("POST") @@ -314,7 +359,7 @@ func (api *APIServer) actions(w http.ResponseWriter, r *http.Request) { apexLog.Error(err.Error()) return } - if err := api.updateSizeOfLastBackup(); err != nil { + if err := api.updateSizeOfLastBackup(command == "create" || command == "restore"); err != nil { apexLog.Errorf("update size: %v", err) } api.metrics.SuccessfulCounter[command].Inc() @@ -343,7 +388,7 @@ func (api *APIServer) actions(w http.ResponseWriter, r *http.Request) { return } apexLog.Info("OK") - if err := api.updateSizeOfLastBackup(); err != nil { + if err := api.updateSizeOfLastBackup(args[1] == "local"); err != nil { apexLog.Errorf("update size: %v", err) } sendJSONEachRow(w, http.StatusCreated, struct { @@ -361,8 +406,19 @@ func (api *APIServer) actions(w http.ResponseWriter, r *http.Request) { } } -func (api *APIServer) actionsLog(w http.ResponseWriter, _ *http.Request) { - sendJSONEachRow(w, http.StatusOK, api.status.status()) +func (api *APIServer) actionsLog(w http.ResponseWriter, r *http.Request) { + var last int64 + var err error + q := r.URL.Query() + if q.Get("last") != "" { + last, err = strconv.ParseInt(q.Get("last"), 10, 16) + if err != nil { + apexLog.Warn(err.Error()) + writeError(w, http.StatusInternalServerError, "actions", err) + return + } + } + sendJSONEachRow(w, http.StatusOK, api.status.status(false, q.Get("filter"), int(last))) } // httpRootHandler - display API index @@ -384,7 +440,7 @@ func (api *APIServer) httpTablesHandler(w http.ResponseWriter, _ *http.Request) writeError(w, http.StatusInternalServerError, "list", err) return } - tables, err := backup.GetTables(*cfg) + tables, err := backup.GetTables(cfg) if err != nil { writeError(w, http.StatusInternalServerError, "tables", err) return @@ -396,7 +452,7 @@ func (api *APIServer) httpTablesHandler(w http.ResponseWriter, _ *http.Request) // CREATE TABLE system.backup_list (name String, created DateTime, size Int64, location String, desc String) ENGINE=URL('http://127.0.0.1:7171/backup/list?user=user&pass=pass', JSONEachRow) // ??? INSERT INTO system.backup_list (name,location) VALUES ('backup_name', 'remote') - upload backup // ??? INSERT INTO system.backup_list (name) VALUES ('backup_name') - create backup -func (api *APIServer) httpListHandler(w http.ResponseWriter, _ *http.Request) { +func (api *APIServer) httpListHandler(w http.ResponseWriter, r *http.Request) { type backupJSON struct { Name string `json:"name"` Created string `json:"created"` @@ -411,29 +467,34 @@ func (api *APIServer) httpListHandler(w http.ResponseWriter, _ *http.Request) { writeError(w, http.StatusInternalServerError, "list", err) return } - localBackups, err := backup.GetLocalBackups(cfg) - if err != nil && !os.IsNotExist(err) { - writeError(w, http.StatusInternalServerError, "list", err) - return - } - for _, b := range localBackups { - description := b.DataFormat - if b.Legacy { - description = "old-format" + vars := mux.Vars(r) + where, wherePresent := vars["where"] + + if where == "local" || !wherePresent { + localBackups, err := backup.GetLocalBackups(cfg) + if err != nil && !os.IsNotExist(err) { + writeError(w, http.StatusInternalServerError, "list", err) + return } - if b.Broken != "" { - description = b.Broken + for _, b := range localBackups { + description := b.DataFormat + if b.Legacy { + description = "old-format" + } + if b.Broken != "" { + description = b.Broken + } + backupsJSON = append(backupsJSON, backupJSON{ + Name: b.BackupName, + Created: b.CreationDate.Format(APITimeFormat), + Size: b.DataSize + b.MetadataSize, + Location: "local", + RequiredBackup: b.RequiredBackup, + Desc: description, + }) } - backupsJSON = append(backupsJSON, backupJSON{ - Name: b.BackupName, - Created: b.CreationDate.Format(APITimeFormat), - Size: b.DataSize + b.MetadataSize, - Location: "local", - RequiredBackup: b.RequiredBackup, - Desc: description, - }) } - if cfg.General.RemoteStorage != "none" { + if cfg.General.RemoteStorage != "none" && (where == "local" || !wherePresent) { remoteBackups, err := backup.GetRemoteBackups(cfg) if err != nil { writeError(w, http.StatusInternalServerError, "list", err) @@ -469,7 +530,7 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request) } cfg, err := config.LoadConfig(api.configPath) if err != nil { - writeError(w, http.StatusInternalServerError, "list", err) + writeError(w, http.StatusInternalServerError, "create", err) return } tablePattern := "" @@ -481,19 +542,25 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request) query := r.URL.Query() if tp, exist := query["table"]; exist { tablePattern = tp[0] - fullCommand = fmt.Sprintf("%s --tables=\"%s\"", fullCommand, tp) + fullCommand = fmt.Sprintf("%s --tables=\"%s\"", fullCommand, tablePattern) } if schema, exist := query["schema"]; exist { schemaOnly, _ = strconv.ParseBool(schema[0]) - fullCommand = fmt.Sprintf("%s --schema", fullCommand) + if schemaOnly { + fullCommand = fmt.Sprintf("%s --schema", fullCommand) + } } - if schema, exist := query["rbac"]; exist { - rbacOnly, _ = strconv.ParseBool(schema[0]) - fullCommand = fmt.Sprintf("%s --rbac", fullCommand) + if rbac, exist := query["rbac"]; exist { + rbacOnly, _ = strconv.ParseBool(rbac[0]) + if rbacOnly { + fullCommand = fmt.Sprintf("%s --rbac", fullCommand) + } } - if schema, exist := query["backupConfig"]; exist { - configsOnly, _ = strconv.ParseBool(schema[0]) - fullCommand = fmt.Sprintf("%s --configs", fullCommand) + if configs, exist := query["configs"]; exist { + configsOnly, _ = strconv.ParseBool(configs[0]) + if configsOnly { + fullCommand = fmt.Sprintf("%s --configs", fullCommand) + } } if name, exist := query["name"]; exist { backupName = name[0] @@ -504,8 +571,10 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request) api.status.start(fullCommand) start := time.Now() api.metrics.LastStart["create"].Set(float64(start.Unix())) - defer api.metrics.LastDuration["create"].Set(float64(time.Since(start).Nanoseconds())) - defer api.metrics.LastFinish["create"].Set(float64(time.Now().Unix())) + defer func() { + api.metrics.LastDuration["create"].Set(float64(time.Since(start).Nanoseconds())) + api.metrics.LastFinish["create"].Set(float64(time.Now().Unix())) + }() err := backup.CreateBackup(cfg, backupName, tablePattern, schemaOnly, rbacOnly, configsOnly, api.clickhouseBackupVersion) defer api.status.stop(err) if err != nil { @@ -514,7 +583,7 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request) apexLog.Errorf("CreateBackup error: %v", err) return } - if err := api.updateSizeOfLastBackup(); err != nil { + if err := api.updateSizeOfLastBackup(true); err != nil { apexLog.Errorf("update size: %v", err) } api.metrics.SuccessfulCounter["create"].Inc() @@ -569,8 +638,10 @@ func (api *APIServer) httpUploadHandler(w http.ResponseWriter, r *http.Request) api.status.start(fullCommand) start := time.Now() api.metrics.LastStart["upload"].Set(float64(start.Unix())) - defer api.metrics.LastDuration["upload"].Set(float64(time.Since(start).Nanoseconds())) - defer api.metrics.LastFinish["upload"].Set(float64(time.Now().Unix())) + defer func() { + api.metrics.LastDuration["upload"].Set(float64(time.Since(start).Nanoseconds())) + api.metrics.LastFinish["upload"].Set(float64(time.Now().Unix())) + }() b := backup.NewBackuper(cfg) err := b.Upload(name, tablePattern, diffFrom, schemaOnly) api.status.stop(err) @@ -580,7 +651,7 @@ func (api *APIServer) httpUploadHandler(w http.ResponseWriter, r *http.Request) api.metrics.LastStatus["upload"].Set(0) return } - if err := api.updateSizeOfLastBackup(); err != nil { + if err := api.updateSizeOfLastBackup(false); err != nil { apexLog.Errorf("update size: %v", err) } api.metrics.SuccessfulCounter["upload"].Inc() @@ -661,8 +732,10 @@ func (api *APIServer) httpRestoreHandler(w http.ResponseWriter, r *http.Request) api.status.start(fullCommand) start := time.Now() api.metrics.LastStart["restore"].Set(float64(start.Unix())) - defer api.metrics.LastDuration["restore"].Set(float64(time.Since(start).Nanoseconds())) - defer api.metrics.LastFinish["restore"].Set(float64(time.Now().Unix())) + defer func() { + api.metrics.LastDuration["restore"].Set(float64(time.Since(start).Nanoseconds())) + api.metrics.LastFinish["restore"].Set(float64(time.Now().Unix())) + }() err := backup.Restore(cfg, name, tablePattern, schemaOnly, dataOnly, dropTable, rbacOnly, configsOnly) api.status.stop(err) if err != nil { @@ -718,8 +791,11 @@ func (api *APIServer) httpDownloadHandler(w http.ResponseWriter, r *http.Request api.status.start(fullCommand) start := time.Now() api.metrics.LastStart["download"].Set(float64(start.Unix())) - defer api.metrics.LastDuration["download"].Set(float64(time.Since(start).Nanoseconds())) - defer api.metrics.LastFinish["download"].Set(float64(time.Now().Unix())) + defer func() { + api.metrics.LastDuration["download"].Set(float64(time.Since(start).Nanoseconds())) + api.metrics.LastFinish["download"].Set(float64(time.Now().Unix())) + }() + b := backup.NewBackuper(cfg) err := b.Download(name, tablePattern, schemaOnly) api.status.stop(err) @@ -729,7 +805,7 @@ func (api *APIServer) httpDownloadHandler(w http.ResponseWriter, r *http.Request api.metrics.LastStatus["download"].Set(0) return } - if err := api.updateSizeOfLastBackup(); err != nil { + if err := api.updateSizeOfLastBackup(false); err != nil { apexLog.Errorf("update size: %v", err) } api.metrics.SuccessfulCounter["download"].Inc() @@ -776,7 +852,7 @@ func (api *APIServer) httpDeleteHandler(w http.ResponseWriter, r *http.Request) writeError(w, http.StatusInternalServerError, "delete", err) return } - if err := api.updateSizeOfLastBackup(); err != nil { + if err := api.updateSizeOfLastBackup(vars["where"] == "local"); err != nil { apexLog.Errorf("update size: %v", err) } sendJSONEachRow(w, http.StatusOK, struct { @@ -793,11 +869,12 @@ func (api *APIServer) httpDeleteHandler(w http.ResponseWriter, r *http.Request) } func (api *APIServer) httpBackupStatusHandler(w http.ResponseWriter, _ *http.Request) { - sendJSONEachRow(w, http.StatusOK, api.status.status()) + sendJSONEachRow(w, http.StatusOK, api.status.status(true, "", 0)) } -func (api *APIServer) updateSizeOfLastBackup() error { - apexLog.Debug("Update last backup size metrics") +func (api *APIServer) updateSizeOfLastBackup(onlyLocal bool) error { + apexLog.Infof("Update last backup size metrics start (onlyLocal=%v)", onlyLocal) + defer apexLog.Infof("Update last backup size metrics finish") if !api.config.API.EnableMetrics { return nil } @@ -810,7 +887,7 @@ func (api *APIServer) updateSizeOfLastBackup() error { } else { api.metrics.LastBackupSizeLocal.Set(0) } - if api.config.General.RemoteStorage == "none" { + if api.config.General.RemoteStorage == "none" || onlyLocal { return nil } remoteBackups, err := backup.GetRemoteBackups(api.config) @@ -999,11 +1076,11 @@ func (api *APIServer) CreateIntegrationTables() error { if api.config.API.Secure { schema = "https" } - query := fmt.Sprintf("CREATE TABLE system.backup_actions (command String, start DateTime, finish DateTime, status String, error String) ENGINE=URL('%s://127.0.0.1:%s/backup/actions%s', JSONEachRow)", schema, port, auth) + query := fmt.Sprintf("CREATE TABLE system.backup_actions (command String, start DateTime, finish DateTime, status String, error String) ENGINE=URL('%s://127.0.0.1:%s/backup/actions%s', JSONEachRow) SETTINGS input_format_skip_unknown_fields=1", schema, port, auth) if err := ch.CreateTable(clickhouse.Table{Database: "system", Name: "backup_actions"}, query, true); err != nil { return err } - query = fmt.Sprintf("CREATE TABLE system.backup_list (name String, created DateTime, size Int64, location String, desc String) ENGINE=URL('%s://127.0.0.1:%s/backup/list%s', JSONEachRow)", schema, port, auth) + query = fmt.Sprintf("CREATE TABLE system.backup_list (name String, created DateTime, size Int64, location String, required String, desc String) ENGINE=URL('%s://127.0.0.1:%s/backup/list%s', JSONEachRow) SETTINGS input_format_skip_unknown_fields=1", schema, port, auth) if err := ch.CreateTable(clickhouse.Table{Database: "system", Name: "backup_list"}, query, true); err != nil { return err } diff --git a/test/integration/config-ftp.yaml b/test/integration/config-ftp.yaml new file mode 100644 index 00000000..2ef8c35d --- /dev/null +++ b/test/integration/config-ftp.yaml @@ -0,0 +1,24 @@ +general: + disable_progress_bar: true + remote_storage: ftp + upload_concurrency: 4 + download_concurrency: 4 +clickhouse: + host: 127.0.0.1 + port: 9440 + username: backup + password: meow=& 123?*%# МЯУ + secure: true + skip_verify: true + restart_command: bash -c 'echo "FAKE RESTART"' +ftp: + address: "ftp:21" + username: "test_backup" + password: "test_backup" + tls: false + path: "/backup" + compression_format: tar + compression_level: 1 + concurrency: 4 +api: + listen: :7171 diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index 79400d08..c610154b 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.3" services: sshd: - image: panubo/sshd:latest + image: docker.io/panubo/sshd:latest container_name: sshd environment: SSH_ENABLE_ROOT: "true" @@ -12,8 +12,22 @@ services: networks: - clickhouse-backup + ftp: + image: docker.io/fauria/vsftpd:latest + container_name: ftp + environment: + FTP_USER: test_backup + FTP_PASS: test_backup + PASV_ENABLE: "YES" + PASV_ADDRESS: "ftp" + PASV_ADDR_RESOLVE: "YES" + PASV_MIN_PORT: 21100 + PASV_MAX_PORT: 21110 + networks: + - clickhouse-backup + minio: - image: minio/minio:${MINIO_VERSION:-latest} + image: docker.io/minio/minio:${MINIO_VERSION:-latest} container_name: minio environment: MINIO_ACCESS_KEY: access-key @@ -45,13 +59,13 @@ services: - clickhouse-backup zookeeper: - image: zookeeper:${ZOOKEEPER_VERSION:-latest} + image: docker.io/zookeeper:${ZOOKEEPER_VERSION:-latest} container_name: zookeeper networks: - clickhouse-backup clickhouse: - image: ${CLICKHOUSE_IMAGE:-yandex/clickhouse-server}:${CLICKHOUSE_VERSION:-1.1.54390} + image: docker.io/${CLICKHOUSE_IMAGE:-yandex/clickhouse-server}:${CLICKHOUSE_VERSION:-1.1.54390} container_name: clickhouse user: root environment: @@ -59,11 +73,12 @@ services: LOG_LEVEL: "${LOG_LEVEL:-info}" S3_DEBUG: "${S3_DEBUG:-false}" GCS_DEBUG: "${GCS_DEBUG:-false}" + FTP_DEBUG: "${FTP_DEBUG:-false}" # STORAGE_EMULATOR_HOST: "http://gsc:8080" # GOOGLE_API_USE_CLIENT_CERTIFICATE: "false" volumes: - ./backup-user.xml:/etc/clickhouse-server/users.d/backup-user.xml - - ${CLICKHOUSE_BACKUP_BIN:-../../clickhouse-backup/clickhouse-backup}:/usr/bin/clickhouse-backup + - ${CLICKHOUSE_BACKUP_BIN:-../../clickhouse-backup/clickhouse-backup-race}:/usr/bin/clickhouse-backup - ./credentials.json:/etc/clickhouse-backup/credentials.json - ./server.crt:/etc/clickhouse-server/server.crt - ./server.key:/etc/clickhouse-server/server.key @@ -80,7 +95,8 @@ services: - zookeeper - minio - sshd - - azure + - ftp +# - azure # - gcs networks: diff --git a/test/integration/docker-compose_advanced.yml b/test/integration/docker-compose_advanced.yml index a99bcb8e..31d4d12c 100644 --- a/test/integration/docker-compose_advanced.yml +++ b/test/integration/docker-compose_advanced.yml @@ -1,7 +1,7 @@ version: "3.3" services: sshd: - image: panubo/sshd:latest + image: docker.io/panubo/sshd:latest container_name: sshd environment: SSH_ENABLE_ROOT: "true" @@ -12,8 +12,33 @@ services: networks: - clickhouse-backup +# ftp: +# image: docker.io/fauria/vsftpd:latest +# container_name: ftp +# environment: +# FTP_USER: test_backup +# FTP_PASS: test_backup +# PASV_ENABLE: "YES" +# PASV_ADDRESS: "ftp" +# PASV_ADDR_RESOLVE: "YES" +# PASV_MIN_PORT: 21100 +# PASV_MAX_PORT: 21110 +# networks: +# - clickhouse-backup + + ftp: + image: docker.io/iradu/proftpd:latest + container_name: ftp + environment: + FTP_USER_NAME: "test_backup" + FTP_USER_PASS: "test_backup" + FTP_MASQUERADEADDRESS: "yes" + FTP_PASSIVE_PORTS: "21100 31100" + networks: + - clickhouse-backup + minio: - image: minio/minio:${MINIO_VERSION:-latest} + image: docker.io/minio/minio:${MINIO_VERSION:-latest} container_name: minio environment: MINIO_ACCESS_KEY: access-key @@ -45,7 +70,7 @@ services: - clickhouse-backup mysql: - image: mysql:${MYSQL_VERSION:-latest} + image: docker.io/mysql:${MYSQL_VERSION:-latest} command: --default_authentication_plugin='mysql_native_password' container_name: mysql environment: @@ -56,13 +81,13 @@ services: - clickhouse-backup zookeeper: - image: zookeeper:${ZOOKEEPER_VERSION:-latest} + image: docker.io/zookeeper:${ZOOKEEPER_VERSION:-latest} container_name: zookeeper networks: - clickhouse-backup clickhouse: - image: ${CLICKHOUSE_IMAGE:-yandex/clickhouse-server}:${CLICKHOUSE_VERSION:-19.17} + image: docker.io/${CLICKHOUSE_IMAGE:-yandex/clickhouse-server}:${CLICKHOUSE_VERSION:-19.17} container_name: clickhouse user: root environment: @@ -70,12 +95,13 @@ services: LOG_LEVEL: "${LOG_LEVEL:-info}" S3_DEBUG: "${S3_DEBUG:-false}" GCS_DEBUG: "${GCS_DEBUG:-false}" + FTP_DEBUG: "${FTP_DEBUG:-false}" # STORAGE_EMULATOR_HOST: "http://gsc:8080" # GOOGLE_API_USE_CLIENT_CERTIFICATE: "false" volumes: - ./backup-user.xml:/etc/clickhouse-server/users.d/backup-user.xml - ./enable-access_management.xml:/etc/clickhouse-server/users.d/enable-access_management.xml - - ${CLICKHOUSE_BACKUP_BIN:-../../clickhouse-backup/clickhouse-backup}:/usr/bin/clickhouse-backup + - ${CLICKHOUSE_BACKUP_BIN:-../../clickhouse-backup/clickhouse-backup-race}:/usr/bin/clickhouse-backup - ./credentials.json:/etc/clickhouse-backup/credentials.json - ./server.crt:/etc/clickhouse-server/server.crt - ./server.key:/etc/clickhouse-server/server.key @@ -96,7 +122,8 @@ services: - minio - sshd - mysql - - azure + - ftp +# - azure # - gcs networks: diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index a95ef794..b0785d6f 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -368,8 +368,7 @@ func TestIntegrationGCS(t *testing.T) { } r := require.New(t) r.NoError(dockerCP("config-gcs.yml", "clickhouse:/etc/clickhouse-backup/config.yml")) - r.NoError(dockerExec("clickhouse", "apt-get", "-y", "update")) - r.NoError(dockerExec("clickhouse", "apt-get", "-y", "install", "ca-certificates")) + installDebIfNotExists(r, "clickhouse", "ca-certificates") testCommon(t) } @@ -380,8 +379,7 @@ func TestIntegrationAzure(t *testing.T) { } r := require.New(t) r.NoError(dockerCP("config-azblob.yml", "clickhouse:/etc/clickhouse-backup/config.yml")) - r.NoError(dockerExec("clickhouse", "apt-get", "-y", "update")) - r.NoError(dockerExec("clickhouse", "apt-get", "-y", "install", "ca-certificates")) + installDebIfNotExists(r, "clickhouse", "ca-certificates") testCommon(t) } @@ -406,6 +404,12 @@ func TestIntegrationSFTPAuthKey(t *testing.T) { testCommon(t) } +func TestIntegrationFTP(t *testing.T) { + r := require.New(t) + r.NoError(dockerCP("config-ftp.yaml", "clickhouse:/etc/clickhouse-backup/config.yml")) + testCommon(t) +} + func TestSyncReplicaTimeout(t *testing.T) { if compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "19.11") == -1 { t.Skipf("Test skipped, SYNC REPLICA ignore receive_timeout for %s version", os.Getenv("CLICKHOUSE_VERSION")) @@ -445,6 +449,46 @@ func TestSyncReplicaTimeout(t *testing.T) { } +func TestUnlockAPI(t *testing.T) { + ch := &TestClickHouse{} + r := require.New(t) + ch.connectWithWait(r) + r.NoError(dockerCP("config-gcs.yml", "clickhouse:/etc/clickhouse-backup/config.yml")) + ch.queryWithNoError(r, "CREATE DATABASE IF NOT EXISTS long_schema") + defer func() { + ch.chbackup.Close() + }() + fieldTypes := []string{"UInt64", "String", "Int"} + installDebIfNotExists(r, "clickhouse", "curl") + maxTables := 10 + minFields := 10 + randFields := 10 + log.Infof("Create %d long_schema.t%%d` tables with with %d..%d fields...", maxTables, minFields, minFields+randFields) + for i := 0; i < maxTables; i++ { + sql := fmt.Sprintf("CREATE TABLE long_schema.t%d (id UInt64", i) + fieldsCount := minFields + rand.Intn(randFields) + for j := 0; j < fieldsCount; j++ { + fieldType := fieldTypes[rand.Intn(len(fieldTypes))] + sql += fmt.Sprintf(", f%d %s", j, fieldType) + } + sql += ") ENGINE=MergeTree() ORDER BY id" + ch.queryWithNoError(r, sql) + } + log.Info("...DONE") + + log.Info("Run `clickhouse-backup server` in background") + r.NoError(dockerExec("-d", "clickhouse", "bash", "-c", "clickhouse-backup server &>>/tmp/clickhouse-backup-server.log")) + time.Sleep(1 * time.Second) + log.Info("...DONE") + log.Info("Create multiple backups") + out, err := dockerExecOut("clickhouse", "bash", "-c", "for i in {1..5}; do date; curl -sL -XPOST 'http://localhost:7171/backup/create?table=long_schema.*'; sleep 1; done") + log.Info("...DONE") + + r.NoError(err) + r.NotContains(out, "Connection refused") + r.NotContains(out, "another operation is currently running") + ch.dropDatabase("long_schema") +} func TestDoRestoreRBAC(t *testing.T) { if compareVersion(os.Getenv("CLICKHOUSE_VERSION"), "20.4") == -1 { t.Skipf("Test skipped, RBAC not available for %s version", os.Getenv("CLICKHOUSE_VERSION")) @@ -604,7 +648,7 @@ func testCommon(t *testing.T) { incrementBackupName := fmt.Sprintf("increment_%d", rand.Int()) log.Info("Clean before start") - fullCleanup(r, ch, testBackupName, incrementBackupName) + fullCleanup(r, ch, []string{testBackupName, incrementBackupName}, false) generateTestData(ch, r) @@ -692,16 +736,20 @@ func testCommon(t *testing.T) { } log.Info("Clean after finish") - fullCleanup(r, ch, testBackupName, incrementBackupName) + fullCleanup(r, ch, []string{testBackupName, incrementBackupName}, true) ch.chbackup.Close() } -func fullCleanup(r *require.Assertions, ch *TestClickHouse, testBackupName string, incrementBackupName string) { - _ = dockerExec("clickhouse", "clickhouse-backup", "delete", "remote", testBackupName) - _ = dockerExec("clickhouse", "clickhouse-backup", "delete", "local", testBackupName) - _ = dockerExec("clickhouse", "clickhouse-backup", "delete", "remote", incrementBackupName) - _ = dockerExec("clickhouse", "clickhouse-backup", "delete", "local", incrementBackupName) +func fullCleanup(r *require.Assertions, ch *TestClickHouse, backupNames []string, checkDeleteErr bool) { + for _, backupName := range backupNames { + for _, backupType := range []string{"remote", "local"} { + err := dockerExec("clickhouse", "clickhouse-backup", "delete", backupType, backupName) + if checkDeleteErr { + r.NoError(err) + } + } + } dropAllDatabases(r, ch) } @@ -988,3 +1036,14 @@ func isTestShouldSkip(envName string) bool { isSkip, _ := map[string]bool{"": true, "0": true, "false": true, "False": true, "1": false, "True": false, "true": false}[os.Getenv(envName)] return isSkip } + +func installDebIfNotExists(r *require.Assertions, container, pkg string) { + r.NoError(dockerExec( + container, + "bash", "-c", + fmt.Sprintf( + "if [[ '0' == $(dpkg -l %s | grep -c -E \"^ii\\s+%s\" ) ]]; then apt-get -y update; apt-get install -y %s; fi", + pkg, pkg, pkg, + ), + )) +} diff --git a/test/integration/run.sh b/test/integration/run.sh index fcd61adc..e796ec51 100755 --- a/test/integration/run.sh +++ b/test/integration/run.sh @@ -4,12 +4,13 @@ set -e export CLICKHOUSE_VERSION=${CLICKHOUSE_VERSION:-21.8} export CLICKHOUSE_IMAGE=${CLICKHOUSE_IMAGE:-yandex/clickhouse-server} -export CLICKHOUSE_BACKUP_BIN="$(pwd)/clickhouse-backup/clickhouse-backup" +export CLICKHOUSE_BACKUP_BIN="$(pwd)/clickhouse-backup/clickhouse-backup-race" export LOG_LEVEL=${LOG_LEVEL:-info} export GCS_TESTS=${GCS_TESTS:-} export AZURE_TESTS=${AZURE_TESTS:-} export S3_DEBUG=${S3_DEBUG:-false} export GCS_DEBUG=${GCS_DEBUG:-false} +export FTP_DEBUG=${FTP_DEBUG:-false} export GODEBUG=${GODEBUG:-} if [[ "${CLICKHOUSE_VERSION}" == 2* ]]; then @@ -21,6 +22,6 @@ fi docker-compose -f test/integration/${COMPOSE_FILE} down --remove-orphans docker volume prune -f make clean -make build +make build-race docker-compose -f test/integration/${COMPOSE_FILE} up -d --force-recreate go test -timeout 30m -failfast -tags=integration -run "${RUN_TESTS:-.+}" -v test/integration/integration_test.go