Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade ruby to v3.2 as 2.7 is not maintained anymore. #345

Merged
merged 12 commits into from
Oct 18, 2023
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Setup Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.7
ruby-version: 3.2
bundler-cache: true

- name: Starting up MySQL
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests_5.7.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Setup Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.7
ruby-version: 3.2
bundler-cache: true

- name: Starting up MySQL
Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ source "https://rubygems.org"
group :test do
gem "minitest"
gem "mysql2"
gem "webrick"

gem "minitest-hooks"
gem "minitest-reporters", "~> 1.4"
gem "minitest-retry"
gem "minitest-fail-fast", "~> 0.1.0"
end

Expand Down
23 changes: 14 additions & 9 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,28 @@ GEM
byebug (11.1.3)
coderay (1.1.3)
method_source (1.0.0)
minitest (5.14.4)
minitest (5.20.0)
minitest-fail-fast (0.1.0)
minitest (~> 5)
minitest-hooks (1.5.0)
minitest-hooks (1.5.1)
minitest (> 5.3)
minitest-reporters (1.4.3)
minitest-reporters (1.6.1)
ansi
builder
minitest (>= 5.0)
ruby-progressbar
mysql2 (0.5.3)
pry (0.13.1)
minitest-retry (0.2.2)
minitest (>= 5.0)
mysql2 (0.5.5)
pry (0.14.2)
coderay (~> 1.1)
method_source (~> 1.0)
pry-byebug (3.9.0)
pry-byebug (3.10.1)
byebug (~> 11.0)
pry (~> 0.13.0)
ruby-progressbar (1.11.0)
tqdm (0.3.0)
pry (>= 0.13, < 0.15)
ruby-progressbar (1.13.0)
tqdm (0.4.1)
webrick (1.8.1)

PLATFORMS
ruby
Expand All @@ -34,9 +37,11 @@ DEPENDENCIES
minitest-fail-fast (~> 0.1.0)
minitest-hooks
minitest-reporters (~> 1.4)
minitest-retry
mysql2
pry-byebug
tqdm
webrick

BUNDLED WITH
2.2.22
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ test-go:

test-ruby:
bundle install
ruby test/main.rb
bundle exec ruby test/main.rb

test: test-go test-ruby

Expand Down
2 changes: 1 addition & 1 deletion dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ up:
or: [mysql@5.7]
conflicts: [mysql-connector-c, mysql, mysql-client]

- ruby: "2.7.3"
- ruby: "3.2.2"
- bundler
- go:
version: "1.16"
Expand Down
41 changes: 21 additions & 20 deletions test/helpers/ghostferry_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ def self.remove_all_binaries
FileUtils.remove_entry(GHOSTFERRY_TEMPDIR) if Dir.exist?(GHOSTFERRY_TEMPDIR)
end

class GhostferryExitFailure < StandardError
end

class Ghostferry
# Manages compiling, running, and communicating with Ghostferry.
#
#
# To use this class:
#
# ghostferry = Ghostferry.new("path/to/main.go")
# ghostferry = Ghostferry.new("path/to/main.go", logger: Logger.new(STDOUT))
# ghostferry.on_status(Ghostferry::Status::BEFORE_ROW_COPY) do
# # do custom work here, such as injecting data into the database
# end
Expand All @@ -32,6 +30,10 @@ class Ghostferry
# Keep these in sync with integrationferry.go
ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT"

Error = Class.new(StandardError)
ExitError = Class.new(Error)
TimeoutError = Class.new(Error)

module Status
# This should be in sync with integrationferry.go
READY = "READY"
Expand All @@ -49,12 +51,8 @@ module Status

attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines

def initialize(main_path, config: {}, logger: nil, message_timeout: 30, port: 39393)
def initialize(main_path, config: {}, logger:, message_timeout: 30, port: 39393)
driv3r marked this conversation as resolved.
Show resolved Hide resolved
@logger = logger
if @logger.nil?
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG
end

@main_path = main_path
@config = config
Expand Down Expand Up @@ -116,7 +114,7 @@ def run(resuming_state = nil)
# stopped properly (if you're using stop_datawriter_during_cutover).
def run_expecting_interrupt(resuming_state = nil)
run(resuming_state)
rescue GhostferryExitFailure
rescue ExitError
dumped_state = @stdout.join("")
JSON.parse(dumped_state)
else
Expand All @@ -127,7 +125,7 @@ def run_expecting_interrupt(resuming_state = nil)
# stopped properly (if you're using stop_datawriter_during_cutover).
def run_expecting_failure(resuming_state = nil)
run(resuming_state)
rescue GhostferryExitFailure
rescue ExitError
else
raise "Ghostferry did not fail"
end
Expand Down Expand Up @@ -156,11 +154,12 @@ def compile_binary
def start_server
@server_last_error = nil

@last_message_time = Time.now
@last_message_time = now
@server = WEBrick::HTTPServer.new(
BindAddress: "127.0.0.1",
Port: @server_port,
Logger: @logger,
MaxClients: 1024,
AccessLog: [],
)

Expand All @@ -174,18 +173,16 @@ def start_server

query = CGI::parse(req.body)

status = query["status"]
status = Array(query["status"]).first
data = query["data"]

unless status
if status.nil?
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status")
resp.status = 400
@server.shutdown
end

status = status.first

@last_message_time = Time.now
@last_message_time = now
@status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil?
rescue StandardError => e
# errors are not reported from WEBrick but the server should fail early
Expand Down Expand Up @@ -317,7 +314,7 @@ def start_ghostferry(resuming_state = nil)

@logger.debug("ghostferry test binary exitted: #{@exit_status}")
if @exit_status.exitstatus != 0
raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}"
raise ExitError, "ghostferry test binary returned non-zero status: #{@exit_status}"
end
end
end
Expand All @@ -328,9 +325,9 @@ def start_server_watchdog
# HTTP server to free up the port.
@server_watchdog_thread = Thread.new do
while @subprocess_thread.alive? do
if Time.now - @last_message_time > @message_timeout
if (now - @last_message_time) > @message_timeout
@server.shutdown
raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s"
raise TimeoutError, "ghostferry did not report to the integration test server for the last #{@message_timeout}s"
end

sleep 1
Expand Down Expand Up @@ -387,7 +384,7 @@ def kill

begin
@subprocess_thread.join if @subprocess_thread
rescue GhostferryExitFailure
rescue ExitError
# ignore
end
end
Expand All @@ -405,5 +402,9 @@ def with_env(key, value)
ensure
ENV[key] = previous_value
end

def now
::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end
end
end
12 changes: 3 additions & 9 deletions test/integration/interrupt_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,10 @@ def test_interrupt_and_resume_without_last_known_schema_cache
def test_interrupt_resume_with_writes_to_source
# Start a ghostferry run expecting it to be interrupted.
datawriter = new_source_datawriter
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2)

start_datawriter_with_ghostferry(datawriter, ghostferry)

batches_written = 0
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
batches_written += 1
if batches_written >= 2
ghostferry.send_signal("TERM")
end
end

dumped_state = ghostferry.run_expecting_interrupt
assert_basic_fields_exist_in_dumped_state(dumped_state)

Expand Down Expand Up @@ -465,10 +457,12 @@ def test_interrupt_resume_idempotence_with_multiple_interrupts_and_writes_to_sou
assert_basic_fields_exist_in_dumped_state(dumped_state)

ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2)

ghostferry.run_expecting_interrupt(dumped_state)

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
stop_datawriter_during_cutover(datawriter, ghostferry)

ghostferry.run_with_logs(dumped_state)

assert_test_table_is_identical
Expand Down
2 changes: 2 additions & 0 deletions test/main.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

require "minitest"
require "minitest/reporters"
require "minitest/retry"
require "minitest/fail_fast"
require "minitest/hooks/test"

Minitest::Reporters.use! Minitest::Reporters::SpecReporter.new
Minitest::Retry.use!(exceptions_to_retry: [GhostferryHelper::Ghostferry::TimeoutError])

test_files.each do |f|
require f
Expand Down
3 changes: 2 additions & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ def new_ghostferry(filepath, config: {})
end

def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0)
g = new_ghostferry(filepath, config)
g = new_ghostferry(filepath, config: config)

batches_written = 0
g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
batches_written += 1

if batches_written >= after_batches_written
g.send_signal("TERM")
end
Expand Down
Loading