From 2500b41f6abe90d95c0148410fbc3f66fc3ce93a Mon Sep 17 00:00:00 2001 From: Olga Naydyonock Date: Fri, 2 Feb 2024 14:10:13 +0200 Subject: [PATCH] Filebeat pipeline migration to Buildkite (#37283) * added test scripts * added windows tests * added packaging step * updated packaging execution conditions * added arm packaging * fixed linting in filbeat test_crawler.py * added platforms for linux packaging --------- Co-authored-by: Pavel Zorin (cherry picked from commit 730dc87d0eb95e74ac1278fd9d6f425b61d73b91) --- .buildkite/env-scripts/env.sh | 13 + .buildkite/env-scripts/linux-env.sh | 24 ++ .buildkite/env-scripts/macos-env.sh | 8 + .buildkite/env-scripts/util.sh | 91 +++++ .buildkite/env-scripts/win-env.sh | 8 + .buildkite/filebeat/filebeat-pipeline.yml | 141 ++++++- .../filebeat/scripts/integration-gotests.sh | 12 + .../filebeat/scripts/integration-pytests.sh | 12 + .buildkite/filebeat/scripts/package-step.sh | 46 +++ .buildkite/filebeat/scripts/package.sh | 12 + .../filebeat/scripts/unit-tests-win.ps1 | 51 +++ .buildkite/filebeat/scripts/unit-tests.sh | 12 + .buildkite/pull-requests.json | 6 +- catalog-info.yaml | 10 +- dev-tools/packaging/package_test.go | 33 +- filebeat/filebeat_windows_amd64.syso | Bin 0 -> 1072 bytes .../filestream/internal/task/group_test.go | 344 ++++++++++++++++++ filebeat/tests/system/test_crawler.py | 22 +- 18 files changed, 813 insertions(+), 32 deletions(-) create mode 100644 .buildkite/env-scripts/env.sh create mode 100644 .buildkite/env-scripts/linux-env.sh create mode 100644 .buildkite/env-scripts/macos-env.sh create mode 100644 .buildkite/env-scripts/util.sh create mode 100644 .buildkite/env-scripts/win-env.sh create mode 100755 .buildkite/filebeat/scripts/integration-gotests.sh create mode 100755 .buildkite/filebeat/scripts/integration-pytests.sh create mode 100755 .buildkite/filebeat/scripts/package-step.sh create mode 100755 .buildkite/filebeat/scripts/package.sh create mode 100644 .buildkite/filebeat/scripts/unit-tests-win.ps1 create mode 100755 .buildkite/filebeat/scripts/unit-tests.sh create mode 100644 filebeat/filebeat_windows_amd64.syso create mode 100644 filebeat/input/filestream/internal/task/group_test.go diff --git a/.buildkite/env-scripts/env.sh b/.buildkite/env-scripts/env.sh new file mode 100644 index 000000000000..d94d03aad53b --- /dev/null +++ b/.buildkite/env-scripts/env.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +SETUP_GVM_VERSION="v0.5.1" +WORKSPACE="$(pwd)" +BIN="${WORKSPACE}/bin" +HW_TYPE="$(uname -m)" +PLATFORM_TYPE="$(uname)" + +export SETUP_GVM_VERSION +export WORKSPACE +export BIN +export HW_TYPE +export PLATFORM_TYPE diff --git a/.buildkite/env-scripts/linux-env.sh b/.buildkite/env-scripts/linux-env.sh new file mode 100644 index 000000000000..edaf1a3100c2 --- /dev/null +++ b/.buildkite/env-scripts/linux-env.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +source .buildkite/env-scripts/util.sh + +DEBIAN_FRONTEND="noninteractive" + +export DEBIAN_FRONTEND + +sudo mkdir -p /etc/needrestart +echo "\$nrconf{restart} = 'a';" | sudo tee -a /etc/needrestart/needrestart.conf > /dev/null + +# Remove this code once beats specific agent is set up +if [[ $PLATFORM_TYPE == "Linux" ]]; then + echo ":: Installing libs ::" + sudo apt-get update + sudo apt-get install -y libsystemd-dev + sudo apt install -y python3-pip + sudo apt-get install -y python3-venv +fi + +echo ":: Setting up environment ::" +add_bin_path +with_go +with_mage diff --git a/.buildkite/env-scripts/macos-env.sh b/.buildkite/env-scripts/macos-env.sh new file mode 100644 index 000000000000..ac1486b64fdd --- /dev/null +++ b/.buildkite/env-scripts/macos-env.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +if [[ $PLATFORM_TYPE == Darwin* ]]; then + echo ":: Setting larger ulimit on MacOS ::" + # To bypass file descriptor errors like "Too many open files error" on MacOS + ulimit -Sn 50000 + echo ":: ULIMIT :: $(ulimit -n)" +fi diff --git a/.buildkite/env-scripts/util.sh b/.buildkite/env-scripts/util.sh new file mode 100644 index 000000000000..157a5aff37af --- /dev/null +++ b/.buildkite/env-scripts/util.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash + +set -euo pipefail + +add_bin_path() { + echo "Adding PATH to the environment variables..." + create_bin + export PATH="${PATH}:${BIN}" +} + +with_go() { + local go_version="${GOLANG_VERSION}" + echo "Setting up the Go environment..." + create_bin + check_platform_architecture + retry 5 curl -sL -o ${BIN}/gvm "https://github.com/andrewkroh/gvm/releases/download/${SETUP_GVM_VERSION}/gvm-${PLATFORM_TYPE}-${arch_type}" + export PATH="${PATH}:${BIN}" + chmod +x ${BIN}/gvm + eval "$(gvm "$go_version")" + go version + which go + export PATH="${PATH}:$(go env GOPATH):$(go env GOPATH)/bin" +} + +with_mage() { + local install_packages=( + "github.com/magefile/mage" + "github.com/elastic/go-licenser" + "golang.org/x/tools/cmd/goimports" + "github.com/jstemmer/go-junit-report" + "gotest.tools/gotestsum" + ) + create_bin + for pkg in "${install_packages[@]}"; do + go install "${pkg}@latest" + done +} + +create_bin() { + if [[ ! -d "${BIN}" ]]; then + mkdir -p ${BIN} + fi +} + +check_platform_architecture() { +# for downloading the GVM and Terraform packages + case "${HW_TYPE}" in + "x86_64") + arch_type="amd64" + ;; + "aarch64") + arch_type="arm64" + ;; + "arm64") + arch_type="arm64" + ;; + *) + echo "The current platform/OS type is unsupported yet" + ;; + esac +} + +retry() { + local retries=$1 + shift + local count=0 + until "$@"; do + exit=$? + wait=$((2 ** count)) + count=$((count + 1)) + if [ $count -lt "$retries" ]; then + >&2 echo "Retry $count/$retries exited $exit, retrying in $wait seconds..." + sleep $wait + else + >&2 echo "Retry $count/$retries exited $exit, no more retries left." + return $exit + fi + done + return 0 +} + +are_files_changed() { + local changeset=$1 + + if git diff --name-only HEAD@{1} HEAD | grep -qE "$changeset"; then + return 0; + else + echo "WARN! No files changed in $changeset" + return 1; + fi +} diff --git a/.buildkite/env-scripts/win-env.sh b/.buildkite/env-scripts/win-env.sh new file mode 100644 index 000000000000..aa5f67ca4cee --- /dev/null +++ b/.buildkite/env-scripts/win-env.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +install_python_win() { + if [[ ${PLATFORM_TYPE} = MINGW* ]]; then + choco install mingw -y + choco install python --version=3.11.0 -y + fi +} diff --git a/.buildkite/filebeat/filebeat-pipeline.yml b/.buildkite/filebeat/filebeat-pipeline.yml index 34321b61161b..e3d7384a71ea 100644 --- a/.buildkite/filebeat/filebeat-pipeline.yml +++ b/.buildkite/filebeat/filebeat-pipeline.yml @@ -1,5 +1,142 @@ # yaml-language-server: $schema=https://raw.githubusercontent.com/buildkite/pipeline-schema/main/schema.json +env: + IMAGE_UBUNTU_X86_64: "family/core-ubuntu-2204" + IMAGE_UBUNTU_ARM_64: "core-ubuntu-2004-aarch64" + IMAGE_WIN_2016: "family/core-windows-2016" + IMAGE_WIN_2019: "family/core-windows-2019" + IMAGE_WIN_2022: "family/core-windows-2022" + IMAGE_MACOS_X86_64: "generic-13-ventura-x64" + steps: - - label: "Example test" - command: echo "Hello!" + - group: "Filebeat Mandatory Testing" + key: "mandatory-tests" + if: build.env("GITHUB_PR_TRIGGER_COMMENT") == "filebeat" || build.env("BUILDKITE_PULL_REQUEST") != "false" + + steps: + - label: ":ubuntu: Unit Tests" + command: + - ".buildkite/filebeat/scripts/unit-tests.sh" + notify: + - github_commit_status: + context: "Filebeat: Unit Tests" + agents: + provider: "gcp" + image: "${IMAGE_UBUNTU_X86_64}" + machineType: "c2-standard-16" + artifact_paths: + - "filebeat/build/*.xml" + - "filebeat/build/*.json" + + - label: ":ubuntu: Go Integration Tests" + command: + - ".buildkite/filebeat/scripts/integration-gotests.sh" + notify: + - github_commit_status: + context: "Filebeat: Integration Tests" + agents: + provider: "gcp" + image: "${IMAGE_UBUNTU_X86_64}" + machineType: "c2-standard-16" + artifact_paths: + - "filebeat/build/*.xml" + - "filebeat/build/*.json" + + - label: ":ubuntu: Python Integration Tests" + command: + - ".buildkite/filebeat/scripts/integration-pytests.sh" + notify: + - github_commit_status: + context: "Filebeat: Python Integration Tests" + agents: + provider: "gcp" + image: "${IMAGE_UBUNTU_X86_64}" + machineType: "c2-standard-16" + artifact_paths: + - "filebeat/build/*.xml" + - "filebeat/build/*.json" + + - label: ":windows:-{{matrix.image}} Unit Tests" + command: ".buildkite/filebeat/scripts/unit-tests-win.ps1" + notify: + - github_commit_status: + context: "Filebeat: Unit Tests" + agents: + provider: "gcp" + image: "{{matrix.image}}" + machine_type: "n2-standard-8" + disk_size: 200 + disk_type: "pd-ssd" + matrix: + setup: + image: + - "${IMAGE_WIN_2016}" + - "${IMAGE_WIN_2022}" + artifact_paths: + - "filebeat/build/*.xml" + - "filebeat/build/*.json" + + - group: "Extended Testing" + key: "extended-tests" + if: build.env("BUILDKITE_PULL_REQUEST") != "false" || build.env("GITHUB_PR_TRIGGER_COMMENT") == "filebeat for extended support" + + steps: + - label: ":linux: ARM64 Unit Tests" + key: "arm-extended" + if: build.env("GITHUB_PR_TRIGGER_COMMENT") == "filebeat for arm" || build.env("GITHUB_PR_LABELS") =~ /.*arm.*/ + command: + - ".buildkite/filebeat/scripts/unit-tests.sh" + notify: + - github_commit_status: + context: "Filebeat/Extended: Unit Tests ARM" + agents: + provider: "aws" + imagePrefix: "${IMAGE_UBUNTU_ARM_64}" + instanceType: "t4g.large" + artifact_paths: "filebeat/build/*.xml" + + - label: ":mac: MacOS Unit Tests" + key: "macos-extended" + if: build.env("GITHUB_PR_TRIGGER_COMMENT") == "filebeat for macos" || build.env("GITHUB_PR_LABELS") =~ /.*macOS.*/ + command: + - ".buildkite/filebeat/scripts/unit-tests.sh" + notify: + - github_commit_status: + context: "Filebeat/Extended: MacOS Unit Tests" + agents: + provider: "orka" + imagePrefix: "${IMAGE_MACOS_X86_64}" + artifact_paths: "filebeat/build/*.xml" + + - group: "Windows Extended Testing" + key: "extended-tests-win" + if: build.env("GITHUB_PR_TRIGGER_COMMENT") == "filebeat for windows" || build.env("GITHUB_PR_LABELS") =~ /.*windows.*/ + + steps: + - label: ":windows: Win 2019 Unit Tests" + key: "win-extended-2019" + command: ".buildkite/filebeat/scripts/unit-tests-win.ps1" + notify: + - github_commit_status: + context: "Filebeat/Extended: Win-2019 Unit Tests" + agents: + provider: "gcp" + image: "${IMAGE_WIN_2019}" + machine_type: "n2-standard-8" + disk_size: 200 + disk_type: "pd-ssd" + artifact_paths: + - "filebeat/build/*.xml" + - "filebeat/build/*.json" + + - group: "Packaging" + key: "packaging" + if: build.env("BUILDKITE_PULL_REQUEST") != "false" + depends_on: + - "mandatory-tests" + - "extended-tests" + - "extended-tests-win" + + steps: + - label: Package pipeline + commands: ".buildkite/filebeat/scripts/package-step.sh | buildkite-agent pipeline upload" diff --git a/.buildkite/filebeat/scripts/integration-gotests.sh b/.buildkite/filebeat/scripts/integration-gotests.sh new file mode 100755 index 000000000000..a3eabf70c0d3 --- /dev/null +++ b/.buildkite/filebeat/scripts/integration-gotests.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -euo pipefail + +source .buildkite/env-scripts/linux-env.sh + +echo ":: Execute Integration Tests ::" +sudo chmod -R go-w filebeat/ + +cd filebeat +umask 0022 +mage goIntegTest diff --git a/.buildkite/filebeat/scripts/integration-pytests.sh b/.buildkite/filebeat/scripts/integration-pytests.sh new file mode 100755 index 000000000000..5e2e403dda87 --- /dev/null +++ b/.buildkite/filebeat/scripts/integration-pytests.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -euo pipefail + +source .buildkite/env-scripts/linux-env.sh + +echo ":: Execute Integration Tests ::" +sudo chmod -R go-w filebeat/ + +cd filebeat +umask 0022 +mage pythonIntegTest diff --git a/.buildkite/filebeat/scripts/package-step.sh b/.buildkite/filebeat/scripts/package-step.sh new file mode 100755 index 000000000000..a4127c3cd1d6 --- /dev/null +++ b/.buildkite/filebeat/scripts/package-step.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash + +set -euo pipefail + +source .buildkite/env-scripts/util.sh + +changeset="^filebeat/ + ^go.mod + ^pytest.ini + ^dev-tools/ + ^libbeat/ + ^testing/ + ^\.buildkite/filebeat/" + +if are_files_changed "$changeset"; then + cat <<-EOF + steps: + - label: ":ubuntu: Packaging Linux X86" + key: "package-linux-x86" + env: + PLATFORMS: "+all linux/amd64 linux/arm64 windows/amd64 darwin/amd64 darwin/arm64" + command: + - ".buildkite/filebeat/scripts/package.sh" + notify: + - github_commit_status: + context: "Filebeat/Packaging: Linux X86" + agents: + provider: "gcp" + image: "${IMAGE_UBUNTU_X86_64}" + + - label: ":linux: Packaging Linux ARM" + key: "package-linux-arm" + env: + PLATFORMS: "linux/arm64" + PACKAGES: "docker" + command: + - ".buildkite/filebeat/scripts/package.sh" + notify: + - github_commit_status: + context: "Filebeat/Packaging: ARM" + agents: + provider: "aws" + imagePrefix: "${IMAGE_UBUNTU_ARM_64}" + instanceType: "t4g.large" +EOF +fi diff --git a/.buildkite/filebeat/scripts/package.sh b/.buildkite/filebeat/scripts/package.sh new file mode 100755 index 000000000000..2ae226eb739c --- /dev/null +++ b/.buildkite/filebeat/scripts/package.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -euo pipefail + +source .buildkite/env-scripts/linux-env.sh + +echo ":: Evaluate Filebeat Changes ::" + +echo ":: Start Packaging ::" +cd filebeat +umask 0022 +mage package diff --git a/.buildkite/filebeat/scripts/unit-tests-win.ps1 b/.buildkite/filebeat/scripts/unit-tests-win.ps1 new file mode 100644 index 000000000000..8990eb30a093 --- /dev/null +++ b/.buildkite/filebeat/scripts/unit-tests-win.ps1 @@ -0,0 +1,51 @@ +$ErrorActionPreference = "Stop" # set -e +$GoVersion = $env:GOLANG_VERSION # If Choco doesn't have the version specified in .go-version file, should be changed manually + +# Forcing to checkout again all the files with a correct autocrlf. +# Doing this here because we cannot set git clone options before. +function fixCRLF() { + Write-Host "-- Fixing CRLF in git checkout --" + git config core.autocrlf false + git rm --quiet --cached -r . + git reset --quiet --hard +} + +function withGolang() { + Write-Host "-- Install golang $GoVersion --" + choco install golang -y --version $GoVersion + + $choco = Convert-Path "$((Get-Command choco).Path)\..\.." + Import-Module "$choco\helpers\chocolateyProfile.psm1" + refreshenv + go version + go env +} + +function installGoDependencies() { + $installPackages = @( + "github.com/magefile/mage" + "github.com/elastic/go-licenser" + "golang.org/x/tools/cmd/goimports" + "github.com/jstemmer/go-junit-report" + "github.com/tebeka/go2xunit" + ) + foreach ($pkg in $installPackages) { + go install "$pkg" + } +} + +fixCRLF + +$ErrorActionPreference = "Continue" # set +e + +Set-Location -Path filebeat +New-Item -ItemType Directory -Force -Path "build" +withGolang +installGoDependencies + +mage build unitTest + +$EXITCODE=$LASTEXITCODE +$ErrorActionPreference = "Stop" + +Exit $EXITCODE diff --git a/.buildkite/filebeat/scripts/unit-tests.sh b/.buildkite/filebeat/scripts/unit-tests.sh new file mode 100755 index 000000000000..cda1dd85aea2 --- /dev/null +++ b/.buildkite/filebeat/scripts/unit-tests.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +set -euo pipefail + +source .buildkite/env-scripts/linux-env.sh +source .buildkite/env-scripts/macos-env.sh + +echo ":: Execute Unit Tests ::" +sudo chmod -R go-w filebeat/ + +umask 0022 +mage -d filebeat unitTest diff --git a/.buildkite/pull-requests.json b/.buildkite/pull-requests.json index 1b55d6ad4e3a..6eeb4b57f13f 100644 --- a/.buildkite/pull-requests.json +++ b/.buildkite/pull-requests.json @@ -25,12 +25,12 @@ "set_commit_status": true, "build_on_commit": true, "build_on_comment": true, - "trigger_comment_regex": "^(?:(?:buildkite\\W+)?(?:build|test)\\W+(?:this|it))|^/test filebeat$", - "always_trigger_comment_regex": "^(?:(?:buildkite\\W+)?(?:build|test)\\W+(?:this|it))|^/test filebeat$", + "trigger_comment_regex": "^/test filebeat(for (arm|macos|windows|extended support))?$|^/packag[ing|e]$", + "always_trigger_comment_regex": "^/test filebeat(for (arm|macos|windows|extended support))?$|^/package filebeat$", "skip_ci_labels": [ ], "skip_target_branches": [ ], "skip_ci_on_only_changed": [ ], - "always_require_ci_on_changed": [ ] + "always_require_ci_on_changed": ["^filebeat/.*", ".buildkite/filebeat/.*", "^go.mod", "^pytest.ini", "^dev-tools/.*", "^libbeat/.*", "^testing/.*" ] }, { "enabled": true, diff --git a/catalog-info.yaml b/catalog-info.yaml index ba6e1fae0cfe..cf2659162af4 100644 --- a/catalog-info.yaml +++ b/catalog-info.yaml @@ -130,9 +130,15 @@ spec: name: filebeat description: "Filebeat pipeline" spec: +<<<<<<< HEAD # branch_configuration: "main 7.* 8.* v7.* v8.*" TODO: temporarily commented to build PRs from forks pipeline_file: ".buildkite/filebeat/filebeat-pipeline.yml" # maximum_timeout_in_minutes: 120 TODO: uncomment when pipeline is ready +======= + branch_configuration: "main 7.* 8.* v7.* v8.*" + pipeline_file: ".buildkite/filebeat/filebeat-pipeline.yml" + maximum_timeout_in_minutes: 120 +>>>>>>> 730dc87d0e (Filebeat pipeline migration to Buildkite (#37283)) provider_settings: build_pull_request_forks: false build_pull_requests: true # requires filter_enabled and filter_condition settings as below when used with buildkite-pr-bot @@ -145,8 +151,8 @@ spec: cancel_intermediate_builds_branch_filter: "!main !7.* !8.*" skip_intermediate_builds: true skip_intermediate_builds_branch_filter: "!main !7.* !8.*" - # env: - # ELASTIC_PR_COMMENTS_ENABLED: "true" TODO: uncomment when pipeline is ready + env: + ELASTIC_PR_COMMENTS_ENABLED: "true" teams: ingest-fp: access_level: MANAGE_BUILD_AND_READ diff --git a/dev-tools/packaging/package_test.go b/dev-tools/packaging/package_test.go index 81bae8da6c1c..b200066b46d9 100644 --- a/dev-tools/packaging/package_test.go +++ b/dev-tools/packaging/package_test.go @@ -655,9 +655,11 @@ func readZip(zipFile string) (*packageFile, error) { } func readDocker(dockerFile string) (*packageFile, *dockerInfo, error) { - // Read the manifest file first so that the config file and layer - // names are known in advance. - manifest, err := getDockerManifest(dockerFile) + var manifest *dockerManifest + var info *dockerInfo + layers := make(map[string]*packageFile) + + manifest, err := readManifest(dockerFile) if err != nil { return nil, nil, err } @@ -668,9 +670,6 @@ func readDocker(dockerFile string) (*packageFile, *dockerInfo, error) { } defer file.Close() - var info *dockerInfo - layers := make(map[string]*packageFile) - gzipReader, err := gzip.NewReader(file) if err != nil { return nil, nil, err @@ -711,11 +710,7 @@ func readDocker(dockerFile string) (*packageFile, *dockerInfo, error) { // Read layers in order and for each file keep only the entry seen in the later layer p := &packageFile{Name: filepath.Base(dockerFile), Contents: map[string]packageEntry{}} - for _, layer := range manifest.Layers { - layerFile, found := layers[layer] - if !found { - return nil, nil, fmt.Errorf("layer not found: %s", layer) - } + for _, layerFile := range layers { for name, entry := range layerFile.Contents { // Check only files in working dir and entrypoint if strings.HasPrefix("/"+name, workingDir) || "/"+name == entrypoint { @@ -740,22 +735,21 @@ func readDocker(dockerFile string) (*packageFile, *dockerInfo, error) { return p, info, nil } -// getDockerManifest opens a gzipped tar file to read the Docker manifest.json -// that it is expected to contain. -func getDockerManifest(file string) (*dockerManifest, error) { - f, err := os.Open(file) +func readManifest(dockerFile string) (*dockerManifest, error) { + var manifest *dockerManifest + + file, err := os.Open(dockerFile) if err != nil { return nil, err } - defer f.Close() + defer file.Close() - gzipReader, err := gzip.NewReader(f) + gzipReader, err := gzip.NewReader(file) if err != nil { return nil, err } defer gzipReader.Close() - var manifest *dockerManifest tarReader := tar.NewReader(gzipReader) for { header, err := tarReader.Next() @@ -774,8 +768,7 @@ func getDockerManifest(file string) (*dockerManifest, error) { break } } - - return manifest, nil + return manifest, err } type dockerManifest struct { diff --git a/filebeat/filebeat_windows_amd64.syso b/filebeat/filebeat_windows_amd64.syso new file mode 100644 index 0000000000000000000000000000000000000000..c52af94f8e059275dff851e701e42fafefdf4132 GIT binary patch literal 1072 zcmZvcPiqrV6vfY&fOb*nQd|w_QbN*yQBY7!i-l0dN^vLCB#i?x6Ot*k(1j~MPnT}| z7B2f8`VGY2dDEAF_$Kq-efOPr?!D)|N&jaewmS_v}vGgN)sxb-p=70Pf@Vy80h4-2mv0c}8F;8`(uk;7{gmt73I%a-Ee~9h& zyMFiA*=*`jn8z_p?z^g{h3EExcburyRJq0)!j@~K0nB5as{r~dV47Neq)R-B-huhf zGXP&|2EKRi$(g^_w)WLlrWV!?U|^3TYmq!ORo4LK(Bx;jLh;di~0fGJ>biqB-LT?#j`|Hx0ei~6oN+*`lHFHx{AzxVk%8;6vk~%W>futpr>sxm z&0($bd;zY777dhe8|f+dF1R5~qi`Y<;<>^htR)zq(yA|A)9HDr#EQ9QXO-$LH>1nA zJ&)Dlb|u`mirY8F5uULXtKIz vU72-VI_yjMVytfDT-<8unfw8NX4Q9SMjbx&h0c!a@6oNM<$r-Bc)a@`Kf{eH literal 0 HcmV?d00001 diff --git a/filebeat/input/filestream/internal/task/group_test.go b/filebeat/input/filestream/internal/task/group_test.go new file mode 100644 index 000000000000..5ce15d455e3e --- /dev/null +++ b/filebeat/input/filestream/internal/task/group_test.go @@ -0,0 +1,344 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package task + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type noopLogger struct{} + +func (n noopLogger) Errorf(string, ...interface{}) {} + +type testLogger strings.Builder + +func (tl *testLogger) Errorf(format string, args ...interface{}) { + sb := (*strings.Builder)(tl) + sb.WriteString(fmt.Sprintf(format, args...)) + sb.WriteString("\n") +} +func (tl *testLogger) String() string { + return (*strings.Builder)(tl).String() +} + +func TestNewGroup(t *testing.T) { + limit := 10 + timeout := time.Second + g := NewGroup(uint64(limit), timeout, noopLogger{}, "") + require.NotNil(t, g, "NewGroup returned a nil group, it cannot be nil") + + require.NotNil(t, g.sem) + + err := g.sem.Acquire(context.Background(), int64(limit-1)) + require.NoError(t, err, "semaphore Acquire failed") + assert.True(t, g.sem.TryAcquire(1), + "semaphore should have 1 place left, there is none") + assert.False(t, g.sem.TryAcquire(1), + "semaphore NOT should have any place left, but there is") + + assert.NotNil(t, g.logErr) + assert.Equal(t, timeout, g.stopTimeout) +} + +func TestGroup_Go(t *testing.T) { + t.Run("don't run more than limit goroutines", func(t *testing.T) { + done := make(chan struct{}) + defer close(done) + runningCount := atomic.Uint64{} + blocked := func(_ context.Context) error { + runningCount.Add(1) + <-done + return nil + } + + want := uint64(2) + g := NewGroup(want, time.Second, noopLogger{}, "") + + err := g.Go(blocked) + require.NoError(t, err) + err = g.Go(blocked) + require.NoError(t, err) + err = g.Go(blocked) + require.NoError(t, err) + + assert.Eventually(t, + func() bool { return want == runningCount.Load() }, + time.Second, 100*time.Millisecond) + }) + + t.Run("workloads wait for available worker", func(t *testing.T) { + runningCount := atomic.Int64{} + doneCount := atomic.Int64{} + + limit := uint64(2) + g := NewGroup(limit, time.Second, noopLogger{}, "") + + done1 := make(chan struct{}) + f1 := func(_ context.Context) error { + defer t.Log("f1 done") + defer doneCount.Add(1) + + runningCount.Add(1) + defer runningCount.Add(-1) + + t.Log("f1 started") + <-done1 + return errors.New("f1") + } + + var f2Finished atomic.Bool + done2 := make(chan struct{}) + f2 := func(_ context.Context) error { + defer t.Log("f2 done") + defer doneCount.Add(1) + + runningCount.Add(1) + + t.Log("f2 started") + <-done2 + + f2Finished.Store(true) + + runningCount.Add(-1) + return errors.New("f2") + } + + var f3Started atomic.Bool + done3 := make(chan struct{}) + f3 := func(_ context.Context) error { + defer t.Log("f3 done") + defer doneCount.Add(1) + + f3Started.Store(true) + runningCount.Add(1) + + defer runningCount.Add(-1) + t.Log("f3 started") + <-done3 + return errors.New("f3") + } + + err := g.Go(f1) + require.NoError(t, err) + err = g.Go(f2) + require.NoError(t, err) + + // Wait to ensure f1 and f2 are running, thus there is no workers free. + assert.Eventually(t, + func() bool { return int64(2) == runningCount.Load() }, + 100*time.Millisecond, time.Millisecond) + + err = g.Go(f3) + require.NoError(t, err) + assert.False(t, f3Started.Load()) + + close(done2) + + assert.Eventually(t, + func() bool { + return f3Started.Load() + }, + 100*time.Millisecond, time.Millisecond) + + // If f3 started, f2 must have finished + assert.True(t, f2Finished.Load()) + assert.Equal(t, int64(limit), runningCount.Load()) + + close(done1) + close(done3) + + t.Log("waiting the worker pool to finish all workloads") + err = g.Stop() + assert.NoError(t, err) + t.Log("worker pool to finished all workloads") + + assert.Eventually(t, + func() bool { return doneCount.Load() == 3 }, + 50*time.Millisecond, + time.Millisecond, + "not all goroutines finished") + }) + + t.Run("return error if the group is closed", func(t *testing.T) { + g := NewGroup(1, time.Second, noopLogger{}, "") + err := g.Stop() + require.NoError(t, err) + + err = g.Go(func(_ context.Context) error { return nil }) + assert.ErrorIs(t, err, context.Canceled) + }) + + t.Run("without limit, all goroutines run", func(t *testing.T) { + // 100 <= limit <= 100000 + limit := rand.Int63n(100000-100) + 100 + t.Logf("running %d goroutines", limit) + g := NewGroup(uint64(limit), time.Second, noopLogger{}, "") + + done := make(chan struct{}) + var runningCounter atomic.Int64 + var i int64 + for i = 0; i < limit; i++ { + err := g.Go(func(context.Context) error { + runningCounter.Add(1) + defer runningCounter.Add(-1) + + <-done + return nil + }) + require.NoError(t, err) + } + + assert.Eventually(t, + func() bool { return limit == runningCounter.Load() }, + 100*time.Millisecond, + time.Millisecond) + + close(done) + err := g.Stop() + require.NoError(t, err) + }) + + t.Run("all workloads return an error", func(t *testing.T) { + logger := &testLogger{} + runCunt := atomic.Uint64{} + wg := sync.WaitGroup{} + + wantErr := errors.New("a error") + workload := func(i int) func(context.Context) error { + return func(_ context.Context) error { + defer runCunt.Add(1) + defer wg.Done() + return fmt.Errorf("[%d]: %w", i, wantErr) + } + } + + want := uint64(2) + g := NewGroup(want, time.Second, logger, "errorPrefix") + + wg.Add(1) + err := g.Go(workload(1)) + require.NoError(t, err) + wg.Wait() + + wg.Add(1) + err = g.Go(workload(2)) + require.NoError(t, err) + wg.Wait() + + err = g.Stop() + + require.NoError(t, err) + logs := logger.String() + assert.Contains(t, logs, wantErr.Error()) + assert.Contains(t, logs, "[2]") + assert.Contains(t, logs, "[1]") + }) + + t.Run("some workloads return an error", func(t *testing.T) { + wantErr := errors.New("a error") + logger := &testLogger{} + want := uint64(2) + + g := NewGroup(want, time.Second, logger, "") + + err := g.Go(func(_ context.Context) error { return nil }) + require.NoError(t, err) + err = g.Go(func(_ context.Context) error { return wantErr }) + require.NoError(t, err) + + time.Sleep(time.Millisecond) + + err = g.Stop() + + assert.NoError(t, err) + assert.Contains(t, logger.String(), wantErr.Error()) + }) + + t.Run("workload returns no error", func(t *testing.T) { + done := make(chan struct{}) + runningCount := atomic.Uint64{} + wg := sync.WaitGroup{} + + bloked := func(i int) func(context.Context) error { + return func(_ context.Context) error { + runningCount.Add(1) + defer wg.Done() + + <-done + return nil + } + } + + want := uint64(2) + g := NewGroup(want, time.Second, noopLogger{}, "") + + wg.Add(2) + err := g.Go(bloked(1)) + require.NoError(t, err) + err = g.Go(bloked(2)) + require.NoError(t, err) + + close(done) + wg.Wait() + + err = g.Stop() + + assert.NoError(t, err) + }) +} + +func TestGroup_Stop(t *testing.T) { + t.Run("timeout", func(t *testing.T) { + + g := NewGroup(1, time.Nanosecond, noopLogger{}, "") + + done := make(chan struct{}) + defer func() { close(done) }() + err := g.Go(func(_ context.Context) error { + <-done + return nil + }) + require.NoError(t, err, "could not launch goroutine") + + time.Sleep(time.Nanosecond) + + err = g.Stop() + assert.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("all goroutine finish before timeout", func(t *testing.T) { + g := NewGroup(1, 50*time.Millisecond, noopLogger{}, "") + + err := g.Go(func(_ context.Context) error { return nil }) + require.NoError(t, err, "could not launch goroutine") + + err = g.Stop() + assert.NoError(t, err) + }) +} diff --git a/filebeat/tests/system/test_crawler.py b/filebeat/tests/system/test_crawler.py index 2b3fb4f3eeac..32a6bcbf7ed2 100644 --- a/filebeat/tests/system/test_crawler.py +++ b/filebeat/tests/system/test_crawler.py @@ -197,7 +197,10 @@ def test_file_renaming(self): # expecting 6 more events self.wait_until( - lambda: self.output_has(lines=iterations1 + iterations2), max_timeout=10) + lambda: self.output_has( + lines=iterations1 + + iterations2), + max_timeout=10) filebeat.check_kill_and_wait() @@ -247,7 +250,10 @@ def test_file_disappear(self): # Let it read the file self.wait_until( - lambda: self.output_has(lines=iterations1 + iterations2), max_timeout=10) + lambda: self.output_has( + lines=iterations1 + + iterations2), + max_timeout=10) filebeat.check_kill_and_wait() @@ -317,7 +323,10 @@ def test_file_disappear_appear(self): # Let it read the file self.wait_until( - lambda: self.output_has(lines=iterations1 + iterations2), max_timeout=10) + lambda: self.output_has( + lines=iterations1 + + iterations2), + max_timeout=10) filebeat.check_kill_and_wait() @@ -468,7 +477,8 @@ def test_tail_files(self): f.write("hello world 2\n") f.flush() - # Sleep 1 second to make sure the file is persisted on disk and timestamp is in the past + # Sleep 1 second to make sure the file is persisted on disk and + # timestamp is in the past time.sleep(1) filebeat = self.start_beat() @@ -569,6 +579,7 @@ def test_encodings(self): with codecs.open(self.working_dir + "/log/test-{}".format(enc_py), "w", enc_py) as f: f.write(text + "\n") + f.close() # create the config file inputs = [] @@ -592,10 +603,11 @@ def test_encodings(self): with codecs.open(self.working_dir + "/log/test-{}".format(enc_py), "a", enc_py) as f: f.write(text + " 2" + "\n") + f.close() # wait again self.wait_until(lambda: self.output_has(lines=len(encodings) * 2), - max_timeout=15) + max_timeout=60) filebeat.check_kill_and_wait() # check that all outputs are present in the JSONs in UTF-8