Skip to content

Commit

Permalink
changes for adding properties to callog to track rac timestamp value (#…
Browse files Browse the repository at this point in the history
…391)

* changes for adding properties to callog to track rac timestamp value

* added pid for worker process id in cal log

* incorporate review comments for moving event logic to same loop

* fixing rac maint concurrency issues

* changes for racmaint in occ

* fixing test bind less error

* fixing test bind throttle error

---------

Co-authored-by: Rajesh S <samal.rajesh@gmail.com>
  • Loading branch information
rasamala83 and rajesh-1983 authored Sep 26, 2024
1 parent cc38b95 commit 4af3606
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 33 deletions.
14 changes: 8 additions & 6 deletions lib/racmaint.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ func InitRacMaint(cmdLineModuleName string) {
interval := GetConfig().RacMaintReloadInterval
if interval > 0 {
for i := 0; i < GetConfig().NumOfShards; i++ {
go racMaintMain(i, interval, cmdLineModuleName)
shardIndex := i //Address the behavior called variable capture.
go racMaintMain(shardIndex, interval, cmdLineModuleName)
}
}
}

// racMaintMain wakes up every n seconds (configured in "rac_sql_interval") and reads the table
//
// [ManagementTablePrefix]_maint table to see if maintenance is requested
func racMaintMain(shard int, interval int, cmdLineModuleName string) {
if logger.GetLogger().V(logger.Debug) {
Expand Down Expand Up @@ -109,8 +111,8 @@ func racMaintMain(shard int, interval int, cmdLineModuleName string) {
}

/*
racMaint is the main function for RAC maintenance processing, being called regularly.
When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing
racMaint is the main function for RAC maintenance processing, being called regularly.
When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing
*/
func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLineModuleName string, prev map[racCfgKey]racCfg) {
//
Expand Down Expand Up @@ -220,12 +222,12 @@ func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLine
workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRW, 0, shard)
}
if err == nil {
go workerpool.RacMaint(racReq)
workerpool.RacMaint(racReq)
}
if GetConfig().ReadonlyPct > 0 {
workerpool, err := GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard)
workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard)
if err == nil {
go workerpool.RacMaint(racReq)
workerpool.RacMaint(racReq)
}
}
prev[cfgKey] = row
Expand Down
50 changes: 25 additions & 25 deletions lib/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error {
worker.setState(wsSchd)
millis := rand.Intn(GetConfig().RandomStartMs)
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms",millis)
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms", millis)
}
time.Sleep(time.Millisecond * time.Duration(millis))

Expand All @@ -131,7 +131,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error {
}
millis := rand.Intn(3000)
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start",wid)
logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start", wid)
}
time.Sleep(time.Millisecond * time.Duration(millis))
}
Expand Down Expand Up @@ -233,8 +233,10 @@ func (pool *WorkerPool) WorkerReady(worker *WorkerClient) (err error) {
// GetWorker gets the active worker if available. backlog with timeout if not.
//
// @param sqlhash to check for soft eviction against a blacklist of slow queries.
// if getworker needs to exam the incoming sql, there does not seem to be another elegant
// way to do this except to pass in the sqlhash as a parameter.
//
// if getworker needs to exam the incoming sql, there does not seem to be another elegant
// way to do this except to pass in the sqlhash as a parameter.
//
// @param timeoutMs[0] timeout in milliseconds. default to adaptive queue timeout.
func (pool *WorkerPool) GetWorker(sqlhash int32, timeoutMs ...int) (worker *WorkerClient, t string, err error) {
if logger.GetLogger().V(logger.Debug) {
Expand Down Expand Up @@ -559,10 +561,10 @@ func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err e
}
if skipRecycle {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:=", pool.desiredSize)
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:=", pool.desiredSize)
}
calMsg := fmt.Sprintf("Recycle(worker_pid)=%d, module_name=%s,shard_id=%d", worker.pid, worker.moduleName, worker.shardID)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","ReturnWorker", cal.TransOK, calMsg)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "ReturnWorker", cal.TransOK, calMsg)
evt.Completed()
}

Expand Down Expand Up @@ -697,8 +699,6 @@ func (pool *WorkerPool) RacMaint(racReq racAct) {
}
now := time.Now().Unix()
window := GetConfig().RacRestartWindow
dbUname := ""
cnt := 0
pool.poolCond.L.Lock()
for i := 0; i < pool.currentSize; i++ {
if (pool.workers[i] != nil) && (racReq.instID == 0 || pool.workers[i].racID == racReq.instID) && (pool.workers[i].startTime < int64(racReq.tm)) {
Expand All @@ -716,23 +716,23 @@ func (pool *WorkerPool) RacMaint(racReq racAct) {
}

if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize)
}
cnt++
if len(dbUname) == 0 {
dbUname = pool.workers[i].dbUname
logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize, "rac.req timestamp=", racReq.tm)
}
//Trigger individual event for worker
evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "")
evt.AddDataStr("poolModName", pool.moduleName)
evt.AddDataInt("workerId", int64(i))
evt.AddDataInt("pid", int64(pool.workers[i].pid))
evt.AddDataInt("shardId", int64(pool.ShardID))
evt.AddDataInt("tm", int64(racReq.tm))
evt.AddDataInt("exitTime", pool.workers[i].exitTime)
evt.AddDataStr("exitInSec", fmt.Sprintf("%dsec", pool.workers[i].exitTime-now))
evt.Completed()
evt = cal.NewCalEvent("DB_UNAME", pool.workers[i].dbUname, cal.TransOK, "")
evt.Completed()
}
}
pool.poolCond.L.Unlock()
// TODO: C++ worker logs one event for each worker, in the worker, so
// we keep the same. Think about changing it
for i := 0; i < cnt; i++ {
evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "")
evt.Completed()
evt = cal.NewCalEvent("DB_UNAME", dbUname, cal.TransOK, "")
evt.Completed()
}
}

// checkWorkerLifespan is called periodically to check if any worker lifetime has expired and terminates it
Expand Down Expand Up @@ -768,12 +768,12 @@ func (pool *WorkerPool) checkWorkerLifespan() {
pool.poolCond.L.Lock()
for i := 0; i < pool.currentSize; i++ {
if (pool.workers[i] != nil) && (pool.workers[i].exitTime != 0) && (pool.workers[i].exitTime <= now) {
if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize*GetConfig().MaxDesiredHealthyWorkerPct/100)) { // Should it be a config value
if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize * GetConfig().MaxDesiredHealthyWorkerPct / 100)) { // Should it be a config value
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize)
}
calMsg := fmt.Sprintf("module_name=%s,shard_id=%d", pool.moduleName, pool.ShardID)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","checkWorkerLifespan", cal.TransOK, calMsg)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "checkWorkerLifespan", cal.TransOK, calMsg)
evt.Completed()
break
}
Expand Down Expand Up @@ -814,7 +814,7 @@ func (pool *WorkerPool) checkWorkerLifespan() {
pool.poolCond.L.Unlock()
for _, w := range workers {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID ,"HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize)
}
w.Terminate()
}
Expand Down
2 changes: 1 addition & 1 deletion tests/unittest/bindLess/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestBindLess(t *testing.T) {
logger.GetLogger().Log(logger.Debug, "TestBindLess +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
testutil.BackupAndClear("cal", "BindLess start")
testutil.BackupAndClear("hera", "BindLess start")
err := partialBadLoad(0.10)
err := partialBadLoad(0.07)
if err != nil && err != NormCliErr() {
t.Fatalf("main step function returned err %s", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion tests/unittest/bindThrottle/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func mkClients(num int, stop *int, bindV int, grpName string, outErr *string, db
func TestBindThrottle(t *testing.T) {
// we would like to clear hera.log, but even if we try, lots of messages still go there
logger.GetLogger().Log(logger.Debug, "BindThrottle +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
err := partialBadLoad(0.10)
err := partialBadLoad(0.07)
if err != nil && err != NormCliErr() {
t.Fatalf("main step function returned err %s", err.Error())
}
Expand Down
122 changes: 122 additions & 0 deletions tests/unittest/rac_maint_async/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"context"
"database/sql"
"fmt"
"math/rand"
"os"
"testing"
"time"

"github.com/paypal/hera/tests/unittest/testutil"
"github.com/paypal/hera/utility/logger"
)

var mx testutil.Mux
var tableName string

func cfg() (map[string]string, map[string]string, testutil.WorkerType) {

appcfg := make(map[string]string)
// best to chose an "unique" port in case golang runs tests in paralel
appcfg["bind_port"] = "31002"
appcfg["log_level"] = "5"
appcfg["log_file"] = "hera.log"
appcfg["sharding_cfg_reload_interval"] = "0"
appcfg["rac_sql_interval"] = "1"

opscfg := make(map[string]string)
opscfg["opscfg.default.server.max_connections"] = "10"
opscfg["opscfg.default.server.log_level"] = "5"

//return appcfg, opscfg, testutil.OracleWorker
return appcfg, opscfg, testutil.MySQLWorker
}

func before() error {
os.Setenv("PARALLEL", "1")
pfx := os.Getenv("MGMT_TABLE_PREFIX")
if pfx == "" {
pfx = "hera"
}
tableName = pfx + "_maint"
return nil
}

func TestMain(m *testing.M) {
os.Exit(testutil.UtilMain(m, cfg, before))
}

func TestRacMaintWithRandomStatusChangeInAsync(t *testing.T) {
logger.GetLogger().Log(logger.Debug, "TestRacMaintWithRandomStatusChangeInAsync begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
shard := 0
db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard))
if err != nil {
t.Fatal("Error starting Mux:", err)
return
}
statusArray := []string{"U", "R", "F"}
time.Sleep(5 * time.Second)

go func() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
for {
status1 := rand.Intn(len(statusArray))
status2 := rand.Intn(len(statusArray))
var err error
var conn *sql.Conn
// cleanup and insert one row in the table
conn, err = db.Conn(ctx)
if err != nil {
t.Fatalf("Error getting connection %s\n", err.Error())
}
tx, _ := conn.BeginTx(ctx, nil)
stmt, _ := tx.PrepareContext(ctx, "/*cmd*/delete from "+tableName)
_, err = stmt.Exec()
if err != nil {
t.Fatalf("Error preparing test (delete table) %s\n", err.Error())
}
stmt, _ = tx.PrepareContext(ctx, "/*cmd*/insert into "+tableName+" (inst_id, status, status_time, module, machine) values (?,?,?,?,?)")
hostname, _ := os.Hostname()
// how to do inst_id
_, err = stmt.Exec(0 /*max instid*/, statusArray[status1], time.Now().Unix()+2, "hera-test", hostname)
_, err = stmt.Exec(0, statusArray[status2], time.Now().Unix()+2, "hera-test_taf", hostname)
if err != nil {
t.Fatalf("Error preparing test (create row in table) %s\n", err.Error())
}
err = tx.Commit()
if err != nil {
t.Fatalf("Error commit %s\n", err.Error())
}
conn.Close()
time.Sleep(1000 * time.Millisecond)
}
}()
if err != nil {
t.Fatal("Error starting Mux:", err)
return
}
db.SetMaxIdleConns(0)
defer db.Close()

time.Sleep(45000 * time.Millisecond)

if 0 == testutil.RegexCountFile("Rac maint activating, worker", "hera.log") {
t.Fatalf("requires rac maint activation for main module status")
}

if 0 == testutil.RegexCountFile("module:HERA-TEST_TAF", "cal.log") {
t.Fatalf("Status 'U' should log the RACMAINT_INFO_CHANGE event")
}
if 0 != testutil.RegexCountFile("invalid_status", "cal.log") {
t.Fatalf("ram maint status 'U' should not skip with invalid-status event")
}

if testutil.RegexCountFile("RAC_ID", "cal.log") < 20 {
t.Fatalf("ram maint should trigger for all workers once.")
}

logger.GetLogger().Log(logger.Debug, "TestRacMaintWithRandomStatusChangeInAsync done -------------------------------------------------------------")
}

0 comments on commit 4af3606

Please sign in to comment.