Skip to content

Commit

Permalink
Merge pull request #47 from sonroyaalmerol/remove-passthrough-proxy
Browse files Browse the repository at this point in the history
use client connections for agent ping instead
  • Loading branch information
sonroyaalmerol authored Jan 11, 2025
2 parents 1de571a + 39902e3 commit 5690bfa
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 72 deletions.
32 changes: 8 additions & 24 deletions internal/store/agent_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,22 @@ package store

import (
"strings"
"time"

"github.com/sonroyaalmerol/pbs-plus/internal/websockets"
)

func (storeInstance *Store) AgentPing(agentTarget *Target) (bool, error) {
func (storeInstance *Store) AgentPing(agentTarget *Target) bool {
splittedName := strings.Split(agentTarget.Name, " - ")
agentHostname := splittedName[0]

if storeInstance.WSHub == nil {
return false, nil
return false
}

err := storeInstance.WSHub.SendCommand(agentHostname, websockets.Message{
Type: "ping",
Content: "ping",
})
if err != nil {
return false, err
storeInstance.WSHub.ClientsMux.RLock()
if client, ok := storeInstance.WSHub.Clients[agentHostname]; ok && client != nil {
storeInstance.WSHub.ClientsMux.RUnlock()
return true
}
storeInstance.WSHub.ClientsMux.RUnlock()

listener := storeInstance.WSHub.Broadcast.Subscribe()
defer storeInstance.WSHub.Broadcast.CancelSubscription(listener)

for {
select {
case resp := <-listener:
if resp.Type == "ping" && resp.Content == "pong" {
return true, nil
}
case <-time.After(time.Second * 2):
return false, nil
}
}
return false
}
29 changes: 3 additions & 26 deletions internal/store/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"time"

_ "github.com/mattn/go-sqlite3"
"github.com/sonroyaalmerol/pbs-plus/internal/syslog"
"github.com/sonroyaalmerol/pbs-plus/internal/utils"
"github.com/sonroyaalmerol/pbs-plus/internal/websockets"
)
Expand Down Expand Up @@ -469,16 +468,8 @@ func (store *Store) GetTarget(name string) (*Target, error) {
return nil, fmt.Errorf("GetTarget: error scanning row from targets table -> %w", err)
}

syslogger, err := syslog.InitializeLogger()
if err != nil {
return nil, fmt.Errorf("GetTarget: failed to initialize logger -> %w", err)
}

if strings.HasPrefix(target.Path, "agent://") {
target.ConnectionStatus, err = store.AgentPing(&target)
if err != nil {
syslogger.Errorf("GetTarget: error agent ping -> %v", err)
}
target.ConnectionStatus = store.AgentPing(&target)
target.IsAgent = true
} else {
target.ConnectionStatus = utils.IsValid(target.Path)
Expand All @@ -505,15 +496,8 @@ func (store *Store) GetAllTargetsByIP(ip string) ([]Target, error) {
return nil, fmt.Errorf("GetAllTargetsByIP: error scanning row from targets -> %w", err)
}

syslogger, err := syslog.InitializeLogger()
if err != nil {
return nil, fmt.Errorf("GetAllTargetsByIP: failed to initialize logger -> %w", err)
}
if strings.HasPrefix(target.Path, "agent://") {
target.ConnectionStatus, err = store.AgentPing(&target)
if err != nil {
syslogger.Errorf("GetAllTargetsByIP: error agent ping -> %v", err)
}
target.ConnectionStatus = store.AgentPing(&target)
target.IsAgent = true
} else {
target.ConnectionStatus = utils.IsValid(target.Path)
Expand Down Expand Up @@ -564,15 +548,8 @@ func (store *Store) GetAllTargets() ([]Target, error) {
return nil, fmt.Errorf("GetAllTargets: error scanning row from targets -> %w", err)
}

syslogger, err := syslog.InitializeLogger()
if err != nil {
return nil, fmt.Errorf("GetTarget: failed to initialize logger -> %w", err)
}
if strings.HasPrefix(target.Path, "agent://") {
target.ConnectionStatus, err = store.AgentPing(&target)
if err != nil {
syslogger.Errorf("GetTarget: error agent ping -> %v", err)
}
target.ConnectionStatus = store.AgentPing(&target)
target.IsAgent = true
} else {
target.ConnectionStatus = utils.IsValid(target.Path)
Expand Down
22 changes: 0 additions & 22 deletions internal/websockets/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,32 +80,10 @@ func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request)
clientID := initMessage.Content
client := NewClient(clientID, conn, s.Broadcast)

// Subscribe to broadcasts
subscription := s.Broadcast.Subscribe()

s.ClientsMux.Lock()
s.Clients[clientID] = client
s.ClientsMux.Unlock()

// Start broadcast handler
go func() {
defer s.Broadcast.CancelSubscription(subscription)
for {
select {
case msg := <-subscription:
select {
case client.send <- msg:
// Message queued successfully
default:
// Client's buffer is full, consider disconnecting
log.Printf("Client %s message buffer full, dropping message", clientID)
}
case <-client.done:
return
}
}
}()

// Start the read/write pumps
go client.readPump(s)
go client.writePump()
Expand Down

0 comments on commit 5690bfa

Please sign in to comment.