Skip to content

Commit

Permalink
implement log forwarding from client
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Nov 6, 2024
1 parent ab8f45a commit d2f32b5
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 105 deletions.
19 changes: 19 additions & 0 deletions cmd/pbs_windows_agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"golang.org/x/sys/windows"

"github.com/getlantern/systray"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/agent/serverlog"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/agent/sftp"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/agent/snapshots"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/utils"
Expand Down Expand Up @@ -52,6 +53,8 @@ func main() {
os.Exit(1)
}

serverLog, _ := serverlog.InitializeLogger()

// Reserve port 33450-33476
drives := utils.GetLocalDrives()
ctx := context.Background()
Expand All @@ -64,12 +67,18 @@ func main() {

err = sftpConfig.PopulateKeys()
if err != nil {
if serverLog != nil {
serverLog.Print(fmt.Sprintf("Unable to populate SFTP keys: %s", err))
}
utils.ShowMessageBox("Error", fmt.Sprintf("Unable to populate SFTP keys: %s", err))
os.Exit(1)
}

port, err := utils.DriveLetterPort(rune)
if err != nil {
if serverLog != nil {
serverLog.Print(fmt.Sprintf("Unable to map letter to port: %s", err))
}
utils.ShowMessageBox("Error", fmt.Sprintf("Unable to map letter to port: %s", err))
os.Exit(1)
}
Expand All @@ -87,13 +96,18 @@ func main() {
}

func onReady(serverUrl string) func() {
serverLog, _ := serverlog.InitializeLogger()

return func() {
systray.SetIcon(icon)
systray.SetTitle("Proxmox Backup Agent")
systray.SetTooltip("Orchestrating backups with Proxmox Backup Server")

url, err := url.Parse(serverUrl)
if err != nil {
if serverLog != nil {
serverLog.Print(fmt.Sprintf("Failed to parse server URL: %s", err))
}
utils.ShowMessageBox("Error", fmt.Sprintf("Failed to parse server URL: %s", err))
os.Exit(1)
}
Expand All @@ -115,6 +129,8 @@ func isAdmin() bool {
}

func runAsAdmin() {
serverLog, _ := serverlog.InitializeLogger()

verb := "runas"
exe, _ := os.Executable()
cwd, _ := os.Getwd()
Expand All @@ -129,6 +145,9 @@ func runAsAdmin() {

err := windows.ShellExecute(0, verbPtr, exePtr, argPtr, cwdPtr, showCmd)
if err != nil {
if serverLog != nil {
serverLog.Print(fmt.Sprintf("Failed to run as administrator: %s", err))
}
utils.ShowMessageBox("Error", fmt.Sprintf("Failed to run as administrator: %s", err))
}
}
64 changes: 64 additions & 0 deletions internal/agent/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package agent

import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/sonroyaalmerol/pbs-d2d-backup/internal/utils"
)

var httpClient *http.Client

func ProxmoxHTTPRequest(method, url string, body io.Reader, respBody any) error {
serverUrl := os.Getenv("PBS_AGENT_SERVER")
req, err := http.NewRequest(
method,
fmt.Sprintf(
"%s%s",
strings.TrimSuffix(serverUrl, "/"),
url,
),
body,
)

if err != nil {
return fmt.Errorf("ProxmoxHTTPRequest: error creating http request -> %w", err)
}

req.Header.Add("Content-Type", "application/json")

if httpClient == nil {
httpClient = &http.Client{
Timeout: time.Second * 30,
Transport: utils.BaseTransport,
}
}

resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("ProxmoxHTTPRequest: error executing http request -> %w", err)
}
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()

rawBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("ProxmoxHTTPRequest: error getting body content -> %w", err)
}

if respBody != nil {
err = json.Unmarshal(rawBody, respBody)
if err != nil {
return fmt.Errorf("ProxmoxHTTPRequest: error json unmarshal body content (%s) -> %w", string(rawBody), err)
}
}

return nil
}
51 changes: 51 additions & 0 deletions internal/agent/serverlog/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package serverlog

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"os"

"github.com/sonroyaalmerol/pbs-d2d-backup/internal/agent"
)

type Logger struct {
Hostname string
}

type PingResponse struct {
Pong bool `json:"pong"`
}

func InitializeLogger() (*Logger, error) {
var pingResp PingResponse
err := agent.ProxmoxHTTPRequest(http.MethodGet, "/api2/json/ping", nil, &pingResp)
if err != nil {
return nil, fmt.Errorf("InitializeLogger: failed to ping server -> %w", err)
}

hostname, _ := os.Hostname()
return &Logger{Hostname: hostname}, nil
}

type LogRequest struct {
Hostname string `json:"hostname"`
Message string `json:"message"`
}

func (l *Logger) Print(v string) {
body, err := json.Marshal(&LogRequest{
Hostname: l.Hostname,
Message: v,
})
if err != nil {
log.Println(fmt.Errorf("Print: error marshalling request body -> %w", err).Error())
}

err = agent.ProxmoxHTTPRequest(http.MethodPost, "/ap2/json/d2d/agent-log", bytes.NewBuffer(body), nil)
if err != nil {
log.Println(fmt.Errorf("Print: error posting log to server -> %w", err).Error())
}
}
27 changes: 27 additions & 0 deletions internal/agent/sftp/sftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,23 @@ import (
"sync"

"github.com/pkg/sftp"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/agent/serverlog"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/agent/snapshots"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/utils"
"golang.org/x/crypto/ssh"
)

func Serve(ctx context.Context, wg *sync.WaitGroup, sftpConfig *SFTPConfig, address, port string, driveLetter string) {
defer wg.Done()

logger, _ := serverlog.InitializeLogger()

listenAt := fmt.Sprintf("%s:%s", address, port)
listener, err := net.Listen("tcp", listenAt)
if err != nil {
if logger != nil {
logger.Print(fmt.Sprintf("Port is already in use! Failed to listen on %s: %v", listenAt, err))
}
utils.ShowMessageBox("Fatal Error", fmt.Sprintf("Port is already in use! Failed to listen on %s: %v", listenAt, err))
os.Exit(1)
}
Expand All @@ -39,6 +46,9 @@ func Serve(ctx context.Context, wg *sync.WaitGroup, sftpConfig *SFTPConfig, addr
default:
conn, err := listener.Accept()
if err != nil {
if logger != nil {
logger.Print(fmt.Sprintf("failed to accept connection: %v", err))
}
utils.ShowMessageBox("Error", fmt.Sprintf("failed to accept connection: %v", err))
continue
}
Expand All @@ -50,20 +60,30 @@ func Serve(ctx context.Context, wg *sync.WaitGroup, sftpConfig *SFTPConfig, addr

func handleConnection(conn net.Conn, sftpConfig *SFTPConfig, driveLetter string) {
defer conn.Close()
logger, _ := serverlog.InitializeLogger()

server, err := url.Parse(sftpConfig.Server)
if err != nil {
if logger != nil {
logger.Print(fmt.Sprintf("failed to parse server IP: %v", err))
}
utils.ShowMessageBox("Error", fmt.Sprintf("failed to parse server IP: %v", err))
return
}

if !strings.Contains(conn.RemoteAddr().String(), server.Hostname()) {
if logger != nil {
logger.Print(fmt.Sprintf("WARNING: an unregistered client has attempted to connect: %s", conn.RemoteAddr().String()))
}
utils.ShowMessageBox("Error", fmt.Sprintf("WARNING: an unregistered client has attempted to connect: %s", conn.RemoteAddr().String()))
return
}

sconn, chans, reqs, err := ssh.NewServerConn(conn, sftpConfig.ServerConfig)
if err != nil {
if logger != nil {
logger.Print(fmt.Sprintf("failed to perform SSH handshake: %v", err))
}
utils.ShowMessageBox("Error", fmt.Sprintf("failed to perform SSH handshake: %v", err))
return
}
Expand Down Expand Up @@ -102,9 +122,13 @@ func handleRequests(requests <-chan *ssh.Request) {

func handleSFTP(channel ssh.Channel, driveLetter string) {
defer channel.Close()
logger, _ := serverlog.InitializeLogger()

snapshot, err := snapshots.Snapshot(driveLetter)
if err != nil {
if logger != nil {
logger.Print(fmt.Sprintf("failed to initialize snapshot: %v", err))
}
utils.ShowMessageBox("Fatal Error", fmt.Sprintf("failed to initialize snapshot: %s", err))
os.Exit(1)
}
Expand All @@ -113,6 +137,9 @@ func handleSFTP(channel ssh.Channel, driveLetter string) {
sftpHandler, err := NewSftpHandler(ctx, driveLetter, snapshot)
if err != nil {
snapshot.Close()
if logger != nil {
logger.Print(fmt.Sprintf("failed to initialize handler: %v", err))
}
utils.ShowMessageBox("Fatal Error", fmt.Sprintf("failed to initialize handler: %s", err))
os.Exit(1)
}
Expand Down
84 changes: 1 addition & 83 deletions internal/proxy/controllers/agents/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@ import (
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"

"github.com/sonroyaalmerol/pbs-d2d-backup/internal/logger"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/proxy/controllers"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/store"
"github.com/sonroyaalmerol/pbs-d2d-backup/internal/utils"
"golang.org/x/crypto/ssh"
)

type LogRequest struct {
Expand Down Expand Up @@ -43,84 +38,7 @@ func AgentLogHandler(storeInstance *store.Store) func(http.ResponseWriter, *http
return
}

syslogger.Info(fmt.Sprintf("PBS Agent Log [%s]: %s", reqParsed.Hostname, reqParsed.Message))

w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(map[string]string{"success": "true"})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
controllers.WriteErrorResponse(w, err)
return
}
}
}

func AgentPingHandler(storeInstance *store.Store) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Invalid HTTP method", http.StatusBadRequest)
}

agentId := r.PathValue("agent_id")
agentTarget, err := storeInstance.GetTarget(agentId)
if err != nil {
w.WriteHeader(http.StatusNotFound)
controllers.WriteErrorResponse(w, err)
return
}

privKeyDir := filepath.Join(store.DbBasePath, "agent_keys")
privKeyFile := filepath.Join(privKeyDir, strings.ReplaceAll(fmt.Sprintf("%s.key", agentTarget.Name), " ", "-"))

pemBytes, err := os.ReadFile(privKeyFile)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
controllers.WriteErrorResponse(w, err)
return
}

signer, err := ssh.ParsePrivateKey(pemBytes)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
controllers.WriteErrorResponse(w, err)
return
}

agentPath := strings.TrimPrefix(agentTarget.Path, "agent://")
agentPathParts := strings.Split(agentPath, "/")
agentHost := agentPathParts[0]
agentDrive := agentPathParts[1]
agentDriveRune := []rune(agentDrive)[0]
agentPort, err := utils.DriveLetterPort(agentDriveRune)

sshConfig := &ssh.ClientConfig{
User: "proxmox",
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}

client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%s", agentHost, agentPort), sshConfig)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
controllers.WriteErrorResponse(w, err)
return
}
defer client.Close()

session, err := client.NewSession()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
controllers.WriteErrorResponse(w, err)
return
}
defer session.Close()

pong, err := session.SendRequest("ping", true, []byte{})
if err != nil || !pong {
w.WriteHeader(http.StatusInternalServerError)
controllers.WriteErrorResponse(w, err)
return
}
syslogger.Info(fmt.Sprintf("PBS Agent [%s]: %s", reqParsed.Hostname, reqParsed.Message))

w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(map[string]string{"success": "true"})
Expand Down
Loading

0 comments on commit d2f32b5

Please sign in to comment.