Skip to content

Commit

Permalink
fixing rac maint concurrency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
rajesh-1983 committed Sep 23, 2024
1 parent bab66f9 commit d8670b9
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 7 deletions.
11 changes: 6 additions & 5 deletions lib/racmaint.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func InitRacMaint(cmdLineModuleName string) {
}

// racMaintMain wakes up every n seconds (configured in "rac_sql_interval") and reads the table
//
// [ManagementTablePrefix]_maint table to see if maintenance is requested
func racMaintMain(shard int, interval int, cmdLineModuleName string) {
if logger.GetLogger().V(logger.Debug) {
Expand Down Expand Up @@ -109,8 +110,8 @@ func racMaintMain(shard int, interval int, cmdLineModuleName string) {
}

/*
racMaint is the main function for RAC maintenance processing, being called regularly.
When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing
racMaint is the main function for RAC maintenance processing, being called regularly.
When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing
*/
func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLineModuleName string, prev map[racCfgKey]racCfg) {
//
Expand Down Expand Up @@ -220,12 +221,12 @@ func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLine
workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRW, 0, shard)
}
if err == nil {
go workerpool.RacMaint(racReq)
workerpool.RacMaint(racReq)
}
if GetConfig().ReadonlyPct > 0 {
workerpool, err := GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard)
workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard)
if err == nil {
go workerpool.RacMaint(racReq)
workerpool.RacMaint(racReq)
}
}
prev[cfgKey] = row
Expand Down
6 changes: 4 additions & 2 deletions lib/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ func (pool *WorkerPool) WorkerReady(worker *WorkerClient) (err error) {
// GetWorker gets the active worker if available. backlog with timeout if not.
//
// @param sqlhash to check for soft eviction against a blacklist of slow queries.
// if getworker needs to exam the incoming sql, there does not seem to be another elegant
// way to do this except to pass in the sqlhash as a parameter.
//
// if getworker needs to exam the incoming sql, there does not seem to be another elegant
// way to do this except to pass in the sqlhash as a parameter.
//
// @param timeoutMs[0] timeout in milliseconds. default to adaptive queue timeout.
func (pool *WorkerPool) GetWorker(sqlhash int32, timeoutMs ...int) (worker *WorkerClient, t string, err error) {
if logger.GetLogger().V(logger.Debug) {
Expand Down
118 changes: 118 additions & 0 deletions tests/unittest/rac_maint_async/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main

import (
"context"
"database/sql"
"fmt"
"math/rand"
"os"
"testing"
"time"

"github.com/paypal/hera/tests/unittest/testutil"
"github.com/paypal/hera/utility/logger"
)

var mx testutil.Mux
var tableName string

func cfg() (map[string]string, map[string]string, testutil.WorkerType) {

appcfg := make(map[string]string)
// best to chose an "unique" port in case golang runs tests in paralel
appcfg["bind_port"] = "31002"
appcfg["log_level"] = "5"
appcfg["log_file"] = "hera.log"
appcfg["sharding_cfg_reload_interval"] = "0"
appcfg["rac_sql_interval"] = "1"

opscfg := make(map[string]string)
opscfg["opscfg.default.server.max_connections"] = "10"
opscfg["opscfg.default.server.log_level"] = "5"

//return appcfg, opscfg, testutil.OracleWorker
return appcfg, opscfg, testutil.MySQLWorker
}

func before() error {
os.Setenv("PARALLEL", "1")
pfx := os.Getenv("MGMT_TABLE_PREFIX")
if pfx == "" {
pfx = "hera"
}
tableName = pfx + "_maint"
return nil
}

func TestMain(m *testing.M) {
os.Exit(testutil.UtilMain(m, cfg, before))
}

func TestRacMaintWithRandomStatusChangeInAsync(t *testing.T) {
logger.GetLogger().Log(logger.Debug, "TestRacMaintWithRandomStatusChangeInAsync begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
shard := 0
db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard))
if err != nil {
t.Fatal("Error starting Mux:", err)
return
}
statusArray := []string{"U", "R", "F"}
time.Sleep(5 * time.Second)

go func() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
for {
status1 := rand.Intn(len(statusArray))
status2 := rand.Intn(len(statusArray))
var err error
var conn *sql.Conn
// cleanup and insert one row in the table
conn, err = db.Conn(ctx)
if err != nil {
t.Fatalf("Error getting connection %s\n", err.Error())
}
tx, _ := conn.BeginTx(ctx, nil)
stmt, _ := tx.PrepareContext(ctx, "/*cmd*/delete from "+tableName)
_, err = stmt.Exec()
if err != nil {
t.Fatalf("Error preparing test (delete table) %s\n", err.Error())
}
stmt, _ = tx.PrepareContext(ctx, "/*cmd*/insert into "+tableName+" (inst_id, status, status_time, module, machine) values (?,?,?,?,?)")
hostname, _ := os.Hostname()
// how to do inst_id
_, err = stmt.Exec(0 /*max instid*/, statusArray[status1], time.Now().Unix()+2, "hera-test", hostname)
_, err = stmt.Exec(0, statusArray[status2], time.Now().Unix()+2, "hera-test_taf", hostname)
if err != nil {
t.Fatalf("Error preparing test (create row in table) %s\n", err.Error())
}
err = tx.Commit()
if err != nil {
t.Fatalf("Error commit %s\n", err.Error())
}
conn.Close()
time.Sleep(1000 * time.Millisecond)
}
}()
if err != nil {
t.Fatal("Error starting Mux:", err)
return
}
db.SetMaxIdleConns(0)
defer db.Close()

time.Sleep(45000 * time.Millisecond)

if 0 == testutil.RegexCountFile("Rac maint activating, worker", "hera.log") {
t.Fatalf("requires rac maint activation for main module status")
}

if 0 == testutil.RegexCountFile("module:HERA-TEST_TAF", "cal.log") {
t.Fatalf("Status 'U' should log the RACMAINT_INFO_CHANGE event")
}
if 0 != testutil.RegexCountFile("invalid_status", "cal.log") {
t.Fatalf("ram maint status 'U' should not skip with invalid-status event")
}

logger.GetLogger().Log(logger.Debug, "TestRacMaintWithRandomStatusChangeInAsync done -------------------------------------------------------------")
}

0 comments on commit d8670b9

Please sign in to comment.