Skip to content

Commit

Permalink
Save active agent
Browse files Browse the repository at this point in the history
Use the info details provided by authn to save the active agent when a transfer
is created or a decision is resolved.
  • Loading branch information
sevein committed Jul 4, 2024
1 parent 8cbae66 commit 649d552
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 37 deletions.
4 changes: 2 additions & 2 deletions hack/ccp/internal/api/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *Server) CreatePackage(ctx context.Context, req *connect.Request[adminv1
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

pkg, err := s.ctrl.Submit(req.Msg)
pkg, err := s.ctrl.Submit(ctx, req.Msg)
if err != nil {
return nil, connect.NewError(connect.CodeUnknown, nil)
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (s *Server) ResolveDecision(ctx context.Context, req *connect.Request[admin
}

id := uuid.MustParse(req.Msg.Id)
err := s.ctrl.ResolveDecision(id, int(req.Msg.Choice.Id))
err := s.ctrl.ResolveDecision(ctx, id, int(req.Msg.Choice.Id))
if err != nil {
s.logger.Error(err, "Failed to resolve awaiting decision.", "id", id)
return nil, connect.NewError(connect.CodeUnknown, nil)
Expand Down
6 changes: 3 additions & 3 deletions hack/ccp/internal/api/admin/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func authApiKey(logger logr.Logger, store store.Store) authn.AuthFunc {
return nil, errInvalidAuth
}

ok, err := store.ValidateUserAPIKey(ctx, username, key)
user, err := store.ValidateUserAPIKey(ctx, username, key)
if err != nil {
logger.Error(err, "Cannot look up user details.")
return nil, errInvalidAuth
}
if !ok {
if user == nil {
return nil, errInvalidAuth
}

return username, nil
return user, nil
}
}

Expand Down
29 changes: 23 additions & 6 deletions hack/ccp/internal/api/admin/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"testing"

"connectrpc.com/authn"
"github.com/artefactual/archivematica/hack/ccp/internal/store/storemock"
"github.com/go-logr/logr"
"go.artefactual.dev/tools/mockutil"
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"

"github.com/artefactual/archivematica/hack/ccp/internal/store"
"github.com/artefactual/archivematica/hack/ccp/internal/store/storemock"
)

func TestAuthentication(t *testing.T) {
Expand All @@ -19,11 +21,26 @@ func TestAuthentication(t *testing.T) {
t.Run("Accepts API key", func(t *testing.T) {
t.Parallel()

store := storemock.NewMockStore(gomock.NewController(t))
store.EXPECT().ValidateUserAPIKey(mockutil.Context(), "test", "test").Return(true, nil)
var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
user := authn.GetInfo(r.Context()).(*store.User) // Rretrieve userinfo from context.
assert.DeepEqual(t, user, &store.User{
ID: 12345,
Username: "test",
Email: "test@test.com",
Active: true,
})
})

auth := multiAuthenticate(authApiKey(logr.Discard(), store))
handler := authn.NewMiddleware(auth).Wrap(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
s := storemock.NewMockStore(gomock.NewController(t))
s.EXPECT().ValidateUserAPIKey(mockutil.Context(), "test", "test").Return(&store.User{
ID: 12345,
Username: "test",
Email: "test@test.com",
Active: true,
}, nil)

auth := multiAuthenticate(authApiKey(logr.Discard(), s))
handler = authn.NewMiddleware(auth).Wrap(handler)

req := httptest.NewRequest("GET", "http://example.com/foo", nil)
req.Header.Set("Authorization", "ApiKey test:test")
Expand All @@ -39,7 +56,7 @@ func TestAuthentication(t *testing.T) {
t.Parallel()

store := storemock.NewMockStore(gomock.NewController(t))
store.EXPECT().ValidateUserAPIKey(mockutil.Context(), "test", "12345").Return(false, nil)
store.EXPECT().ValidateUserAPIKey(mockutil.Context(), "test", "12345").Return(nil, nil)

auth := multiAuthenticate(authApiKey(logr.Discard(), store))
handler := authn.NewMiddleware(auth).Wrap(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
Expand Down
6 changes: 3 additions & 3 deletions hack/ccp/internal/api/admin/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (s *Server) ApproveJob(ctx context.Context, req *connect.Request[adminv1.Ap
}

jobID := uuid.MustParse(req.Msg.JobId)
err := s.ctrl.ResolveDecisionLegacy(jobID, req.Msg.Choice)
err := s.ctrl.ResolveDecisionLegacy(ctx, jobID, req.Msg.Choice)
if err != nil {
s.logger.V(2).Info("Failed to approve job.", "err", err, "jobID", jobID, "choice", req.Msg.Choice)
return nil, connect.NewError(connect.CodeUnknown, nil)
Expand Down Expand Up @@ -77,7 +77,7 @@ func (s *Server) ApproveTransferByPath(ctx context.Context, req *connect.Request
chainID = t.BypassChainID
}

if err := s.ctrl.ResolveDecisionLegacy(jobID, chainID.String()); err != nil {
if err := s.ctrl.ResolveDecisionLegacy(ctx, jobID, chainID.String()); err != nil {
return nil, connect.NewError(connect.CodeUnknown, fmt.Errorf("unable to resolve decision: %v", err))
}

Expand Down Expand Up @@ -109,7 +109,7 @@ func (s *Server) ApprovePartialReingest(ctx context.Context, req *connect.Reques
}

jobID, _ := uuid.Parse(job.Id)
if err := s.ctrl.ResolveDecisionLegacy(jobID, chain.ID.String()); err != nil {
if err := s.ctrl.ResolveDecisionLegacy(ctx, jobID, chain.ID.String()); err != nil {
return nil, connect.NewError(connect.CodeUnknown, fmt.Errorf("unable to resolve decision: %v", err))
}

Expand Down
29 changes: 25 additions & 4 deletions hack/ccp/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"sync"
"time"

"connectrpc.com/authn"
"github.com/artefactual-labs/gearmin"
"github.com/go-logr/logr"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"

adminv1 "github.com/artefactual/archivematica/hack/ccp/internal/api/gen/archivematica/ccp/admin/v1beta1"
"github.com/artefactual/archivematica/hack/ccp/internal/cmd/servercmd/metrics"
"github.com/artefactual/archivematica/hack/ccp/internal/derrors"
"github.com/artefactual/archivematica/hack/ccp/internal/ssclient"
"github.com/artefactual/archivematica/hack/ccp/internal/store"
"github.com/artefactual/archivematica/hack/ccp/internal/workflow"
Expand Down Expand Up @@ -119,7 +121,7 @@ func (c *Controller) Run() error {
}

// Submit a transfer request.
func (c *Controller) Submit(req *adminv1.CreatePackageRequest) (*Package, error) {
func (c *Controller) Submit(ctx context.Context, req *adminv1.CreatePackageRequest) (*Package, error) {
// TODO: have NewTransferPackage return a function we can schedule here.
var once sync.Once
queue := func(pkg *Package) {
Expand All @@ -129,7 +131,16 @@ func (c *Controller) Submit(req *adminv1.CreatePackageRequest) (*Package, error)
})
}

pkg, err := NewTransferPackage(c.groupCtx, c.logger.WithName("package"), c.store, c.ssclient, c.sharedDir, req, queue)
pkg, err := NewTransferPackage( //nolint: contextcheck
// ctx is request-scoped, use the group context instead.
authn.SetInfo(c.groupCtx, authn.GetInfo(ctx)),
c.logger.WithName("package"),
c.store,
c.ssclient,
c.sharedDir,
req,
queue,
)
if err != nil {
return nil, fmt.Errorf("create package: %v", err)
}
Expand Down Expand Up @@ -390,7 +401,9 @@ func (c *Controller) Decisions() map[uuid.UUID][]*adminv1.Decision {
return ret
}

func (c *Controller) ResolveDecision(id uuid.UUID, pos int) error {
func (c *Controller) ResolveDecision(ctx context.Context, id uuid.UUID, pos int) (err error) {
defer derrors.Add(&err, "ResolveDecision()")

c.mu.RLock()
defer c.mu.RUnlock()

Expand All @@ -408,10 +421,14 @@ func (c *Controller) ResolveDecision(id uuid.UUID, pos int) error {
return errors.New("decision cannot be found")
}

if err := match.pkg.updateActiveAgent(ctx); err != nil {
return fmt.Errorf("update active agent: %v", err)
}

return match.resolveWithPos(pos)
}

func (c *Controller) ResolveDecisionLegacy(jobID uuid.UUID, choice string) error {
func (c *Controller) ResolveDecisionLegacy(ctx context.Context, jobID uuid.UUID, choice string) error {
c.mu.RLock()
defer c.mu.RUnlock()

Expand All @@ -429,6 +446,10 @@ func (c *Controller) ResolveDecisionLegacy(jobID uuid.UUID, choice string) error
return errors.New("package is not awaiting")
}

if err := match.pkg.updateActiveAgent(ctx); err != nil {
return fmt.Errorf("update active agent: %v", err)
}

// We attempt to read the choice as an integer describing the position of
// the decision to choose.
if pos, err := strconv.Atoi(choice); err == nil {
Expand Down
16 changes: 13 additions & 3 deletions hack/ccp/internal/controller/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"maps"
"os"
"path/filepath"
"strconv"
"strings"

"connectrpc.com/authn"
"github.com/go-logr/logr"
"github.com/google/uuid"

Expand Down Expand Up @@ -148,7 +150,7 @@ func NewTransferPackage(
return nil, err
}

if err := pkg.updateActiveAgent(ctx, "TODO"); err != nil {
if err := pkg.updateActiveAgent(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -368,8 +370,16 @@ func (p *Package) markAsFailed(ctx context.Context) error {
return p.store.UpdatePackageStatus(ctx, p.id, p.packageType(), enums.PackageStatusFailed)
}

func (p *Package) updateActiveAgent(ctx context.Context, userID string) error {
return nil // TODO: we have not implemented auth yet!
// updateActiveAgent saves the activeAgent variable using the user information
// that is provided in the context.
func (p *Package) updateActiveAgent(ctx context.Context) error {
info := authn.GetInfo(ctx)
user, ok := info.(*store.User)
if !ok || user == nil || user.AgentID == nil {
return nil
}

return p.saveValue(ctx, "activeAgent", strconv.Itoa(*user.AgentID))
}

// unit represents logic that is specific to a particular type of package, e.g. Transfer.
Expand Down
23 changes: 18 additions & 5 deletions hack/ccp/internal/store/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,18 +975,31 @@ func (s *mysqlStoreImpl) ReadStorageServiceConfig(ctx context.Context) (ret Stor
return ret, nil
}

func (s *mysqlStoreImpl) ValidateUserAPIKey(ctx context.Context, username, key string) (_ bool, err error) {
func (s *mysqlStoreImpl) ValidateUserAPIKey(ctx context.Context, username, key string) (_ *User, err error) {
defer wrap(&err, "ValidateUserAPIKey(%q, %q)", username, key)

_, err = s.queries.ReadUserWithKey(ctx, &sqlc.ReadUserWithKeyParams{
row, err := s.queries.ReadUserWithKey(ctx, &sqlc.ReadUserWithKeyParams{
Username: username,
Key: key,
})
if err == sql.ErrNoRows {
return false, nil
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}

return true, nil
ret := &User{
ID: int(row.ID),
Username: row.Username,
Email: row.Email,
Active: row.IsActive,
}
if row.AgentID.Valid {
ret.AgentID = ref.New(int(row.AgentID.Int32))
}

return ret, nil
}

func (s *mysqlStoreImpl) Running() bool {
Expand Down
13 changes: 11 additions & 2 deletions hack/ccp/internal/store/sqlcmysql/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions hack/ccp/internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,11 @@ type Store interface {
// Archivematica Storage Service associated to this pipeline.
ReadStorageServiceConfig(ctx context.Context) (StorageServiceConfig, error)

// ValidateUserAPIKey confirms that the username with the given API key
// exists in the database and is active.
ValidateUserAPIKey(ctx context.Context, username, key string) (bool, error)
// ValidateUserAPIKey checks if a user with the given username and API key
// exists and is active. It returns a pointer to the User if valid, or nil
// and an error otherwise. A nil User doesn't necessarily mean the user
// doesn't exist; check the error for details.
ValidateUserAPIKey(ctx context.Context, username, key string) (*User, error)

Running() bool
Close() error
Expand Down Expand Up @@ -217,3 +219,11 @@ type FindAwaitingJobParams struct {
PackageID *uuid.UUID
Group *string
}

type User struct {
ID int
Username string
Email string
Active bool
AgentID *int
}
10 changes: 5 additions & 5 deletions hack/ccp/internal/store/storemock/mock_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion hack/ccp/sqlc/mysql/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ SELECT name, value, scope FROM DashboardSettings WHERE name = ?;
--

-- name: ReadUserWithKey :one
SELECT auth_user.id, auth_user.username, auth_user.is_active
SELECT auth_user.id, auth_user.username, auth_user.email, auth_user.is_active, main_userprofile.agent_id
FROM auth_user
JOIN tastypie_apikey ON auth_user.id = tastypie_apikey.user_id
LEFT JOIN main_userprofile ON auth_user.id = main_userprofile.user_id
WHERE auth_user.username = ? AND tastypie_apikey.key = ? AND auth_user.is_active = 1
LIMIT 1;

0 comments on commit 649d552

Please sign in to comment.