diff --git a/lib/config.go b/lib/config.go index d250bdd4..e1b67efb 100644 --- a/lib/config.go +++ b/lib/config.go @@ -41,7 +41,7 @@ type Config struct { NumStdbyDbs int InitialMaxChildren int ReadonlyPct int - TafPct int + TafChildrenPct int // // backlog // @@ -393,7 +393,7 @@ func InitConfig() error { } gAppConfig.ReadonlyPct = cdb.GetOrDefaultInt("readonly_children_pct", 0) - gAppConfig.TafPct = cdb.GetOrDefaultInt("taf_children_pct", 100) + gAppConfig.TafChildrenPct = cdb.GetOrDefaultInt("taf_children_pct", 100) gAppConfig.InitialMaxChildren = numWorkers if gAppConfig.EnableWhitelistTest { if gAppConfig.NumWhitelistChildren < 2 { @@ -644,15 +644,19 @@ func GetNumRWorkers(shard int) int { // GetNumStdByWorkers gets the number of workers for the "StdBy" pool func GetNumStdByWorkers(shard int) int { - numWhiteList := GetWhiteListChildCount(shard) - if numWhiteList > 0 { - return numWhiteList + num := GetNumWWorkers(shard) + // TafChildrenPct should not be greater than 100. + if gAppConfig.TafChildrenPct > 100 { + return num } - num := GetNumWorkers(shard) - if gAppConfig.TafPct > 0 { - num = num * gAppConfig.TafPct / 100 - if num == 0 { + if gAppConfig.EnableTAF && gAppConfig.TafChildrenPct < 100 { + if gAppConfig.TafChildrenPct < 0 { num = 1 + } else { + num = num * gAppConfig.TafChildrenPct / 100 + if num == 0 { + num = 1 + } } } return num diff --git a/tests/unittest/adjustTafChildrenPct/main_test.go b/tests/unittest/adjustTafChildrenPct/main_test.go new file mode 100644 index 00000000..e32ac169 --- /dev/null +++ b/tests/unittest/adjustTafChildrenPct/main_test.go @@ -0,0 +1,102 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + "time" + + _ "github.com/paypal/hera/client/gosqldriver/tcp" + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux +//var tableName string + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + fmt.Println ("setup() begin") + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31002" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["enable_taf"] = "true" + + appcfg["opscfg.default.server.max_connections"] = "10" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.log_level"] = "5" + + if os.Getenv("WORKER") == "postgres" { + return appcfg, opscfg, testutil.PostgresWorker + } + return appcfg, opscfg, testutil.MySQLWorker +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, nil)) +} + +func TestAdjustTafChildrenPct(t *testing.T) { + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPct begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + + shard := 0 + db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard)) + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer db.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } + defer conn.Close() + + rows, _ := conn.QueryContext(ctx, "SELECT version()") + + if !rows.Next() { + t.Fatalf("Expected 1 row") + } + rows.Close() + + acpt, err := testutil.StatelogGetField(2, "hera.taf") + if err != nil { + t.Fatalf("Error reading state log: %s\n", err.Error()) + } + + if acpt != 10 { + t.Fatalf("Expected TAF pool size: 10, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, " -v hera.taf") + if acpt != 5 { + t.Fatalf("Expected primary pool size: 10, Actual %d\n", acpt) + } + + fmt.Println ("We now change max connections at runtime"); + testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5") + //Wait for opsfcg change to take effect + time.Sleep(10 * time.Second) + + acpt, _ = testutil.StatelogGetField(2, "hera.taf") + + if acpt != 5 { + t.Fatalf("Expected TAF pool size: 5, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, " -v hera.taf") + if acpt != 5 { + t.Fatalf("Expected primary pool size: 5, Actual %d\n", acpt) + } + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPct done -------------------------------------------------------------") +} \ No newline at end of file diff --git a/tests/unittest/adjustTafChildrenPctModified/main_test.go b/tests/unittest/adjustTafChildrenPctModified/main_test.go new file mode 100644 index 00000000..371f5a83 --- /dev/null +++ b/tests/unittest/adjustTafChildrenPctModified/main_test.go @@ -0,0 +1,102 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + "time" + + _ "github.com/paypal/hera/client/gosqldriver/tcp" + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux +//var tableName string + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + fmt.Println ("setup() begin") + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31002" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["enable_taf"] = "true" + appcfg["taf_children_pct"] = "20" + appcfg["opscfg.default.server.max_connections"] = "10" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.log_level"] = "5" + + if os.Getenv("WORKER") == "postgres" { + return appcfg, opscfg, testutil.PostgresWorker + } + return appcfg, opscfg, testutil.MySQLWorker +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, nil)) +} + +func TestAdjustTafChildrenPctModified(t *testing.T) { + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctModified begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + + shard := 0 + db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard)) + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } + db.SetMaxIdleConns(0) + defer db.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } + defer conn.Close() + + rows, _ := conn.QueryContext(ctx, "SELECT version()") + + if !rows.Next() { + t.Fatalf("Expected 1 row") + } + rows.Close() + + acpt, err := testutil.StatelogGetField(2, "hera.taf") + if err != nil { + t.Fatalf("Error reading state log: %s\n", err.Error()) + } + + if acpt != 3 { + t.Fatalf("Expected TAF pool size: 2, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, " -v hera.taf") + if acpt != 5 { + t.Fatalf("Expected primary pool size: 10, Actual %d\n", acpt) + } + + fmt.Println ("We now change max connections at runtime"); + testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5") + //Wait for opsfcg change to take effect + time.Sleep(10 * time.Second) + + acpt, _ = testutil.StatelogGetField(2, "hera.taf") + + if acpt != 5 { + t.Fatalf("Expected TAF pool size: 1, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, " -v hera.taf") + if acpt != 5 { + t.Fatalf("Expected primary pool size: 5, Actual %d\n", acpt) + } + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctModified done -------------------------------------------------------------") +} \ No newline at end of file diff --git a/tests/unittest/adjustTafChildrenPctModifiedWithSharding/main_test.go b/tests/unittest/adjustTafChildrenPctModifiedWithSharding/main_test.go new file mode 100644 index 00000000..2c605c02 --- /dev/null +++ b/tests/unittest/adjustTafChildrenPctModifiedWithSharding/main_test.go @@ -0,0 +1,114 @@ +package main + +import ( + "fmt" + "os" + "testing" + "time" + + _ "github.com/paypal/hera/client/gosqldriver/tcp" + "github.com/paypal/hera/tests/unittest/testutil" + "github.com/paypal/hera/utility/logger" +) + +var mx testutil.Mux +//var tableName string + +func cfg() (map[string]string, map[string]string, testutil.WorkerType) { + fmt.Println ("setup() begin") + appcfg := make(map[string]string) + // best to chose an "unique" port in case golang runs tests in paralel + appcfg["bind_port"] = "31002" + appcfg["log_level"] = "5" + appcfg["log_file"] = "hera.log" + appcfg["enable_taf"] = "true" + appcfg["taf_children_pct"] = "20" + appcfg["enable_sharding"] = "true" + appcfg["shard_key_name"] = "email_addr" + appcfg["shard_key_value_type_is_string"] = "true" + appcfg["num_shards"] = "2" + appcfg["sharding_cfg_reload_interval"] = "0" + + appcfg["opscfg.default.server.max_connections"] = "10" + + opscfg := make(map[string]string) + opscfg["opscfg.default.server.log_level"] = "5" + + if os.Getenv("WORKER") == "postgres" { + return appcfg, opscfg, testutil.PostgresWorker + } + return appcfg, opscfg, testutil.MySQLWorker +} + +func setupShardMap() { + testutil.RunDML("DROP TABLE IF EXISTS test_str_sk") + testutil.RunDML("create table test_str_sk (email_addr varchar(64), note varchar(64))") + testutil.RunDML("DROP TABLE IF EXISTS hera_shard_map") + testutil.RunDML("create table hera_shard_map ( scuttle_id smallint not null, shard_id smallint not null, status char(1) , read_status char(1), write_status char(1), remarks varchar(500))") + for i := 0; i < 1024; i++ { + testutil.RunDML(fmt.Sprintf("insert into hera_shard_map ( scuttle_id, shard_id, status, read_status, write_status ) values ( %d, 0, 'Y', 'Y', 'Y' )", i) ) + } +} + +func TestMain(m *testing.M) { + os.Exit(testutil.UtilMain(m, cfg, nil)) +} + +func TestAdjustTafChildrenPctWithSharding(t *testing.T) { + setupShardMap() + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctWithSharding begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + + acpt, err := testutil.StatelogGetField(2, "hera.taf.sh0") + if err != nil { + t.Fatalf("Error reading state log: %s\n", err.Error()) + } + + if acpt != 2 { + t.Fatalf("Expected TAF sh0 pool size: 2, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, "hera.taf.sh1") + + if acpt != 2 { + t.Fatalf("Expected TAF sh1 pool size: 2, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, " -v hera.sh0") + if acpt != 10 { + t.Fatalf("Expected primary pool sh0 size: 10, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, " -v hera.sh1") + if acpt != 10 { + t.Fatalf("Expected primary pool sh1 size: 10, Actual %d\n", acpt) + } + + fmt.Println ("We now change max connections at runtime"); + testutil.ModifyOpscfgParam (t, "hera.txt", "max_connections", "5") + //Wait for opsfcg change to take effect + time.Sleep(10 * time.Second) + + acpt, _ = testutil.StatelogGetField(2, "hera.taf.sh0") + + if acpt != 1 { + t.Fatalf("Expected TAF sh0 pool size: 1, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, "hera.taf.sh1") + + if acpt != 1 { + t.Fatalf("Expected TAF sh1 pool size: 1, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, " -v hera.sh0") + if acpt != 5 { + t.Fatalf("Expected primary sh0 pool size: 5, Actual %d\n", acpt) + } + + acpt, _ = testutil.StatelogGetField(2, " -v hera.sh1") + if acpt != 5 { + t.Fatalf("Expected primary sh1 pool size: 5, Actual %d\n", acpt) + } + + logger.GetLogger().Log(logger.Debug, "TestAdjustTafChildrenPctWithSharding done -------------------------------------------------------------") +} \ No newline at end of file diff --git a/tests/unittest/testutil/setup.go b/tests/unittest/testutil/setup.go index 795c78d0..6c365e03 100644 --- a/tests/unittest/testutil/setup.go +++ b/tests/unittest/testutil/setup.go @@ -360,9 +360,15 @@ func (m *mux) StartServer() error { os.Setenv("username", "herausertest") os.Setenv("password", "Hera-User-Test-9") os.Setenv("TWO_TASK", "tcp(127.0.0.1:2121)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0", "tcp(127.0.0.1:2121)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0_0", "tcp(127.0.0.1:2121)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0_1", "tcp(127.0.0.1:2121)/heratestdb") } else if xMysql == "auto" { ip := MakeDB("mysql22", "heratestdb", MySQL) os.Setenv("TWO_TASK", "tcp("+ip+":3306)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0", "tcp("+ip+":3306)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0_0", "tcp("+ip+":3306)/heratestdb") + os.Setenv("TWO_TASK_STANDBY0_1", "tcp("+ip+":3306)/heratestdb") os.Setenv("TWO_TASK_1", "tcp("+ip+":3306)/heratestdb") os.Setenv("TWO_TASK_2", "tcp("+ip+":3306)/heratestdb") os.Setenv("MYSQL_IP", ip) diff --git a/tests/unittest/testutil/util.go b/tests/unittest/testutil/util.go index 979ee4cb..b842a358 100644 --- a/tests/unittest/testutil/util.go +++ b/tests/unittest/testutil/util.go @@ -7,10 +7,12 @@ import ( "database/sql" "errors" "fmt" + "io/ioutil" "os" "os/exec" "regexp" "strings" + "testing" "time" "github.com/paypal/hera/utility/logger" @@ -20,8 +22,13 @@ var ( INCOMPLETE = errors.New("Incomplete row") ) -func StatelogGetField(pos int) (int, error) { - out, err := exec.Command("/bin/bash", "-c", "/usr/bin/tail -n 1 state.log").Output() +func StatelogGetField(pos int, pattern ...string) (int, error) { + cmd := "/usr/bin/tail -n 1 state.log" + if len(pattern) > 0 { + cmd = fmt.Sprintf("/usr/bin/tail -n 1 state.log | grep '%s'", pattern[0]) + // fmt.Println("cmd:", cmd) + } + out, err := exec.Command("/bin/bash", "-c", cmd).Output() if err != nil { return -1, err } @@ -216,3 +223,24 @@ func ClearLogsData() { } defer calLogFile.Close() } + +func ModifyOpscfgParam (t *testing.T, logfile string, opscfg_param string, opscfg_value string) { + //Read file + data, err := ioutil.ReadFile(runFolder + "/" + logfile) + if err != nil { + t.Fatal(err) + } + lines := strings.Split(string(data), "\n") + //Modify the opcfg value + for i, line := range lines { + if strings.Contains(line, opscfg_param) { + lines[i] = "opscfg.default.server." + opscfg_param + "=" + opscfg_value + } + } + output := strings.Join(lines, "\n") + // write to file + err = ioutil.WriteFile(runFolder + "/" + logfile, []byte(output), 0644) + if err != nil { + t.Fatal(err) + } +}