Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WPT data consumption workflow #10

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The above skaffold command deploys multiple resources:
| gcs | Google Cloud Storage Emulator | N/A | http://gcs:4443 |
| repo-downloader | Repo Downloader Workflow Step in<br />./workflows/steps/services/common/repo_downloader | http://localhost:8091 | http://repo-downloader:8080 |
| web-feature-consumer | Web Feature Consumer Step in<br />./workflows/steps/services/web_feature_consumer | http://localhost:8092 | http://web-feature-consumer:8080 |
| wpt-consumer | WPT Consumer Step in<br />./workflows/steps/services/wpt_consumer | http://localhost:8093 | http://wpt-consumer:8080 |

_In the event the servers are not responsive, make a temporary change to a file_
_in a watched directory (e.g. backend). This will rebuild and expose the_
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ go-openapi: $(OPENAPI_OUT_DIR)/backend/types.gen.go \
$(OPENAPI_OUT_DIR)/workflows/steps/web_feature_consumer/client.gen.go \
$(OPENAPI_OUT_DIR)/workflows/steps/web_feature_consumer/types.gen.go \
$(OPENAPI_OUT_DIR)/workflows/steps/web_feature_consumer/server.gen.go \
$(OPENAPI_OUT_DIR)/workflows/steps/wpt_consumer/client.gen.go \
$(OPENAPI_OUT_DIR)/workflows/steps/wpt_consumer/types.gen.go \
$(OPENAPI_OUT_DIR)/workflows/steps/wpt_consumer/server.gen.go \
$(OPENAPI_OUT_DIR)/workflows/steps/common/repo_downloader/client.gen.go \
$(OPENAPI_OUT_DIR)/workflows/steps/common/repo_downloader/types.gen.go \
$(OPENAPI_OUT_DIR)/workflows/steps/common/repo_downloader/server.gen.go
Expand Down
2 changes: 1 addition & 1 deletion backend/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
if value, found := os.LookupEnv("DATASTORE_DATABASE"); found {
datastoreDB = &value
}
fs, err := gds.NewWebFeatureClient(os.Getenv("PROJECT_ID"), datastoreDB)
fs, err := gds.NewDatastoreClient(os.Getenv("PROJECT_ID"), datastoreDB)
if err != nil {
slog.Error("failed to create datastore client", "error", err.Error())
os.Exit(1)
Expand Down
13 changes: 9 additions & 4 deletions backend/pkg/httpserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,26 @@ import (
)

type WebFeatureMetadataStorer interface {
List(ctx context.Context) ([]backend.Feature, error)
Get(ctx context.Context, featureID string) (*backend.Feature, error)
ListWebFeataureData(ctx context.Context, nextPageToken *string) ([]backend.Feature, *string, error)
GetWebFeatureData(ctx context.Context, featureID string) (*backend.Feature, error)
}

type Server struct {
metadataStorer WebFeatureMetadataStorer
}

// V1ListFeatureMetricsByBrowserAndChannel implements backend.StrictServerInterface.
func (*Server) V1ListFeatureMetricsByBrowserAndChannel(ctx context.Context, request backend.V1ListFeatureMetricsByBrowserAndChannelRequestObject) (backend.V1ListFeatureMetricsByBrowserAndChannelResponseObject, error) {
panic("unimplemented")
}

// GetV1FeaturesFeatureId implements backend.StrictServerInterface.
// nolint: revive, ireturn // Name generated from openapi
func (s *Server) GetV1FeaturesFeatureId(
ctx context.Context,
request backend.GetV1FeaturesFeatureIdRequestObject,
) (backend.GetV1FeaturesFeatureIdResponseObject, error) {
feature, err := s.metadataStorer.Get(ctx, request.FeatureId)
feature, err := s.metadataStorer.GetWebFeatureData(ctx, request.FeatureId)
if err != nil {
slog.Error("unable to get feature", "error", err)

Expand All @@ -61,7 +66,7 @@ func (s *Server) GetV1Features(
ctx context.Context,
_ backend.GetV1FeaturesRequestObject,
) (backend.GetV1FeaturesResponseObject, error) {
featureData, err := s.metadataStorer.List(ctx)
featureData, _, err := s.metadataStorer.ListWebFeataureData(ctx, nil)
if err != nil {
// TODO check error type
slog.Error("unable to get list of features", "error", err)
Expand Down
138 changes: 81 additions & 57 deletions lib/gds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ import (
"log/slog"

"cloud.google.com/go/datastore"
"github.com/GoogleChrome/webstatus.dev/lib/gen/jsonschema/web_platform_dx__web_features"
"github.com/GoogleChrome/webstatus.dev/lib/gen/openapi/backend"
"google.golang.org/api/iterator"
)

const featureDataKey = "FeatureDataTest"

type Client struct {
*datastore.Client
}

func NewWebFeatureClient(projectID string, database *string) (*Client, error) {
// NewDatastoreClient returns a Client for the Google Datastore service.
func NewDatastoreClient(projectID string, database *string) (*Client, error) {
if projectID == "" {
return nil, errors.New("projectID is empty")
}
Expand All @@ -53,53 +51,58 @@ func NewWebFeatureClient(projectID string, database *string) (*Client, error) {
return &Client{client}, nil
}

type FeatureData struct {
WebFeatureID string `datastore:"web_feature_id"`
Name string `datastore:"name"`
id int64 // The integer ID used in the datastore.
// Filterable modifies a query with a given filter.
type Filterable interface {
FilterQuery(*datastore.Query) *datastore.Query
}

// entityClient is generic client that contains generic methods that can apply
// to any entity stored in datastore.
type entityClient[T any] struct {
*Client
}

func (f FeatureData) ID() int64 {
return f.id
type Mergeable[T any] interface {
Merge(existing *T, new *T) *T
}

func (c *Client) Upsert(
func (c *entityClient[T]) upsert(
ctx context.Context,
webFeatureID string,
data web_platform_dx__web_features.FeatureData,
) error {
kind string,
data *T,
mergeable Mergeable[T],
filterables ...Filterable) error {
// Begin a transaction.
_, err := c.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
// Get the entity, if it exists.
var entity []FeatureData
query := datastore.NewQuery(featureDataKey).FilterField("web_feature_id", "=", webFeatureID).Transaction(tx)
var existingEntity []T
query := datastore.NewQuery(kind)
for _, filterable := range filterables {
query = filterable.FilterQuery(query)
}
query = query.Limit(1).Transaction(tx)

keys, err := c.GetAll(ctx, query, &entity)
keys, err := c.GetAll(ctx, query, &existingEntity)
if err != nil && !errors.Is(err, datastore.ErrNoSuchEntity) {
slog.Error("unable to check for existing entities", "error", err)

return err
}

var key *datastore.Key
// If the entity exists, update it.
// If the entity exists, merge the two entities.
if len(keys) > 0 {
key = keys[0]

data = mergeable.Merge(&existingEntity[0], data)
} else {
// If the entity does not exist, insert it.
key = datastore.IncompleteKey(featureDataKey, nil)
key = datastore.IncompleteKey(kind, nil)
}

// nolint: exhaustruct // id does not exist yet
feature := &FeatureData{
WebFeatureID: webFeatureID,
Name: data.Name,
}
_, err = tx.Put(key, feature)
_, err = tx.Put(key, data)
if err != nil {
// Handle any errors in an appropriate way, such as returning them.
slog.Error("unable to upsert metadata", "error", err)
slog.Error("unable to upsert entity", "error", err)

return err
}
Expand All @@ -116,42 +119,63 @@ func (c *Client) Upsert(
return nil
}

func (c *Client) List(ctx context.Context) ([]backend.Feature, error) {
var featureData []*FeatureData
_, err := c.GetAll(ctx, datastore.NewQuery(featureDataKey), &featureData)
if err != nil {
return nil, err
func (c entityClient[T]) list(
ctx context.Context,
kind string,
pageToken *string,
filterables ...Filterable) ([]*T, *string, error) {
var data []*T
query := datastore.NewQuery(kind)
if pageToken != nil {
cursor, err := datastore.DecodeCursor(*pageToken)
if err != nil {
return nil, nil, err
}
query = query.Start(cursor)
}
for _, filterable := range filterables {
query = filterable.FilterQuery(query)
}
ret := make([]backend.Feature, len(featureData))

// nolint: exhaustruct
// TODO. Will fix this lint error once the data is coming in.
for idx, val := range featureData {
ret[idx] = backend.Feature{
FeatureId: val.WebFeatureID,
Name: val.Name,
Spec: nil,
it := c.Run(ctx, query)
for {
var entity T
_, err := it.Next(&entity)
if errors.Is(err, iterator.Done) {
cursor, err := it.Cursor()
if err != nil {
// TODO: Handle error.
return nil, nil, err
}
nextToken := cursor.String()

return data, &nextToken, nil
}
if err != nil {
return nil, nil, err
}
data = append(data, &entity)
}

return ret, nil
}

func (c *Client) Get(ctx context.Context, webFeatureID string) (*backend.Feature, error) {
var featureData []*FeatureData
_, err := c.GetAll(
ctx, datastore.NewQuery(featureDataKey).
FilterField("web_feature_id", "=", webFeatureID).Limit(1),
&featureData)
var ErrEntityNotFound = errors.New("queried entity not found")

func (c entityClient[T]) get(ctx context.Context, kind string, filterables ...Filterable) (*T, error) {
var data []*T
query := datastore.NewQuery(kind)
for _, filterable := range filterables {
query = filterable.FilterQuery(query)
}
query = query.Limit(1)
_, err := c.GetAll(ctx, query, &data)
if err != nil {
slog.Error("failed to list data", "error", err, "kind", kind)

return nil, err
}

// nolint: exhaustruct
// TODO. Will fix this lint error once the data is coming in.
return &backend.Feature{
Name: featureData[0].WebFeatureID,
FeatureId: featureData[0].WebFeatureID,
Spec: nil,
}, nil
if len(data) < 1 {
return nil, ErrEntityNotFound
}

return data[0], nil
}
Loading
Loading