Skip to content

Commit

Permalink
feat: pg get checkpoint end (#5126)
Browse files Browse the repository at this point in the history
  • Loading branch information
kizuna-lek authored Sep 18, 2023
1 parent 8e154e3 commit 348fe5e
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
4 changes: 4 additions & 0 deletions lorry/component/postgres/local_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func PgCtl(arg string) (string, error) {
return ExecCommand("su", args...)
}

func PgWalDump(args ...string) (string, error) {
return ExecCommand("pg_waldump", args...)
}

func ExecCommand(name string, args ...string) (string, error) {
var stdout, stderr bytes.Buffer
cmd := exec.Command(name, args...)
Expand Down
19 changes: 17 additions & 2 deletions lorry/component/postgres/officalpostgres/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (mgr *Manager) checkTimelineAndLsn(ctx context.Context, cluster *dcs.Cluste
needRewind = true
case localTimeLine == primaryTimeLine:
needRewind = false
case primaryTimeLine > 1:
case localTimeLine < primaryTimeLine:
history = mgr.getHistory(cluster.GetMemberAddr(*cluster.GetLeaderMember()), primaryTimeLine)
}

Expand All @@ -453,7 +453,8 @@ func (mgr *Manager) checkTimelineAndLsn(ctx context.Context, cluster *dcs.Cluste
case localLsn >= h.SwitchPoint:
needRewind = true
default:
// TODO:get checkpoint end
checkPointEnd := mgr.getCheckPointEnd(localTimeLine, localLsn)
needRewind = h.SwitchPoint != checkPointEnd
}
exitFlag = true
break
Expand All @@ -471,6 +472,19 @@ func (mgr *Manager) checkTimelineAndLsn(ctx context.Context, cluster *dcs.Cluste
return needRewind
}

func (mgr *Manager) getCheckPointEnd(timeLine, lsn int64) int64 {
lsnStr := postgres.FormatPgLsn(lsn)

resp, err := postgres.PgWalDump("-t", strconv.FormatInt(timeLine, 10), "-s", lsnStr, "-n", "2")
if err == nil || resp == "" {
return 0
}

checkPointEndStr := postgres.ParsePgWalDumpError(err.Error(), lsnStr)

return postgres.ParsePgLsn(checkPointEndStr)
}

func (mgr *Manager) getPrimaryTimeLine(host string) (int64, error) {
resp, err := postgres.Psql("-h", host, "replication=database", "-c", "IDENTIFY_SYSTEM")
if err != nil {
Expand All @@ -494,6 +508,7 @@ func (mgr *Manager) getLocalTimeLineAndLsn(ctx context.Context) (bool, int64, in
return mgr.getLocalTimeLineAndLsnFromControlData()
}

// TODO:check in recovery
inRecovery = true
timeLine := mgr.getReceivedTimeLine(ctx)
lsn, _ := mgr.getLsnWithHost(ctx, "replay", "")
Expand Down
19 changes: 19 additions & 0 deletions lorry/component/postgres/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package postgres
import (
"bufio"
"context"
"fmt"
"strconv"
"strings"

Expand Down Expand Up @@ -323,6 +324,10 @@ func ParsePgLsn(str string) int64 {
return prefix*0x100000000 + suffix
}

func FormatPgLsn(lsn int64) string {
return fmt.Sprintf("%X/%08X", lsn>>32, lsn&0xFFFFFFFF)
}

func ParsePrimaryConnInfo(str string) map[string]string {
infos := strings.Split(str, " ")
result := make(map[string]string)
Expand All @@ -336,3 +341,17 @@ func ParsePrimaryConnInfo(str string) map[string]string {

return result
}

func ParsePgWalDumpError(errorInfo string, lsnStr string) string {
prefixPattern := fmt.Sprintf("error in WAL record at %s: invalid record length at ", lsnStr)
suffixPattern := ": wanted "

startIndex := strings.Index(errorInfo, prefixPattern) + len(prefixPattern)
endIndex := strings.Index(errorInfo, suffixPattern)

if startIndex == -1 || endIndex == -1 {
return ""
}

return errorInfo[startIndex:endIndex]
}
27 changes: 27 additions & 0 deletions lorry/component/postgres/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ func TestParsePGLsn(t *testing.T) {
})
}

func TestFormatPgLsn(t *testing.T) {
t.Run("format lsn", func(t *testing.T) {
lsn := int64(16777376)

lsnStr := FormatPgLsn(lsn)
assert.Equal(t, "0/010000A0", lsnStr)
})
}

func TestParsePrimaryConnInfo(t *testing.T) {
t.Run("legal primary conn info str", func(t *testing.T) {
primaryConnInfoStr := "host=pg-pg-replication-0.pg-pg-replication-headless port=5432 user=postgres application_name=my-application"
Expand Down Expand Up @@ -210,3 +219,21 @@ func TestParseHistory(t *testing.T) {
assert.Equal(t, ParsePgLsn("0/60000A0 "), history.History[1].SwitchPoint)
})
}

func TestParsePgWalDumpError(t *testing.T) {
t.Run("parse success", func(t *testing.T) {
errorInfo := "pg_waldump: fatal: error in WAL record at 0/182E220: invalid record length at 0/182E298: wanted 24, got 0"

resp := ParsePgWalDumpError(errorInfo, "0/182E220")

assert.Equal(t, "0/182E298", resp)
})

t.Run("parse failed", func(t *testing.T) {
errorInfo := "pg_waldump: fatal: error in WAL record at 0/182E220: invalid record length at 0/182E298"

resp := ParsePgWalDumpError(errorInfo, "0/182E220")

assert.Equal(t, "", resp)
})
}

0 comments on commit 348fe5e

Please sign in to comment.