Skip to content

Commit

Permalink
cherry-pic #6443 Prevent oversized payloads from Python's RemoteScope…
Browse files Browse the repository at this point in the history
…s API to v0.19 (#6444)

* fix: use streaming for Python->Go RemoteScopes API to prevent oversized payloads (#6443)

Co-authored-by: Keon Amini <keon.amini@merico.dev>

* fix: linting

---------

Co-authored-by: Keon Amini <keon.a380@gmail.com>
Co-authored-by: Keon Amini <keon.amini@merico.dev>
  • Loading branch information
3 people authored Nov 13, 2023
1 parent 06ab72e commit d52d6c9
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 62 deletions.
27 changes: 15 additions & 12 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/go-git/go-git/v5 v5.9.0
github.com/go-playground/validator/v10 v10.14.1
github.com/gocarina/gocsv v0.0.0-20220707092902-b9da1f06c77e
github.com/google/uuid v1.3.0
github.com/google/uuid v1.3.1
github.com/jackc/pgx/v5 v5.3.1 // indirect
github.com/lib/pq v1.10.2
github.com/libgit2/git2go/v33 v33.0.6
Expand All @@ -24,17 +24,15 @@ require (
github.com/spf13/cast v1.4.1
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.8.1
github.com/stretchr/testify v1.8.3
github.com/stretchr/testify v1.8.4
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.1
github.com/tidwall/gjson v1.14.3
github.com/viant/afs v1.16.0
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
go.temporal.io/sdk v1.14.0
golang.org/x/crypto v0.14.0
golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
golang.org/x/oauth2 v0.11.0
golang.org/x/sync v0.4.0
gorm.io/datatypes v1.0.1
gorm.io/driver/mysql v1.5.1
Expand Down Expand Up @@ -73,11 +71,12 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.0 // indirect
github.com/gogo/status v1.1.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand Down Expand Up @@ -117,15 +116,17 @@ require (
github.com/xanzy/ssh-agent v0.3.3 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/net v0.16.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf // indirect
google.golang.org/grpc v1.44.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand All @@ -134,6 +135,8 @@ require (

require (
github.com/golang-jwt/jwt/v5 v5.0.0-rc.1
go.temporal.io/api v1.25.0
go.temporal.io/sdk v1.25.1
golang.org/x/mod v0.13.0
)

Expand Down
62 changes: 29 additions & 33 deletions backend/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion backend/python/pydevlake/pydevlake/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def plugin_info(self):
@plugin_method
def remote_scopes(self, connection: dict, group_id: Optional[str] = None):
c = self._plugin.connection_type(**connection)
return self._plugin.make_remote_scopes(c, group_id)
yield from self._plugin.make_remote_scopes(c, group_id)

def _mk_context(self, data: dict):
db_url = data['db_url']
Expand Down
8 changes: 2 additions & 6 deletions backend/python/pydevlake/pydevlake/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,18 @@ def _run_stream(self, ctx: Context, stream_name: str, subtask: str):

def make_remote_scopes(self, connection: Connection, group_id: Optional[str] = None) -> msg.RemoteScopes:
if group_id:
remote_scopes = []
for tool_scope in self.remote_scopes(connection, group_id):
tool_scope.connection_id = connection.id
tool_scope.raw_data_params = raw_data_params(connection.id, tool_scope.id)
tool_scope.raw_data_table = self._raw_scope_table_name()
remote_scopes.append(
msg.RemoteScope(
yield msg.RemoteScope(
id=tool_scope.id,
parent_id=group_id,
name=tool_scope.name,
data=tool_scope
)
)
else:
remote_scopes = self.remote_scope_groups(connection)
return msg.RemoteScopes(__root__=remote_scopes)
yield from self.remote_scope_groups(connection)

def make_pipeline(self, scope_config_pairs: list[ScopeConfigPair],
connection: Connection) -> msg.PipelineData:
Expand Down
2 changes: 1 addition & 1 deletion backend/server/services/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func pipelineServiceInit() {
if temporalUrl != "" {
// TODO: logger
var err error
temporalClient, err = client.NewClient(client.Options{
temporalClient, err = client.NewLazyClient(client.Options{
HostPort: temporalUrl,
})
if err != nil {
Expand Down
26 changes: 19 additions & 7 deletions backend/server/services/remote/plugin/remote_scope_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package plugin

import (
"github.com/apache/incubator-devlake/core/models"
"net/http"
"strconv"

Expand All @@ -43,28 +44,39 @@ func (pa *pluginAPI) GetRemoteScopes(input *plugin.ApiResourceInput) (*plugin.Ap
if connectionId == 0 {
return nil, errors.BadInput.New("invalid connectionId")
}

connection := pa.connType.New()
err := pa.connhelper.First(connection, input.Params)
if err != nil {
return nil, err
}

groupId := input.Query.Get("groupId")

remoteScopes := make([]RemoteScopesTreeNode, 0)
err = pa.invoker.Call("remote-scopes", bridge.DefaultContext, connection.Unwrap(), groupId).Get(&remoteScopes)
remoteScopes, err := pa.getRemoteScopesIncrementally(connection, groupId)
if err != nil {
return nil, err
}

output := RemoteScopesOutput{
Children: remoteScopes,
}

return &plugin.ApiResourceOutput{Body: output, Status: http.StatusOK}, nil
}

func (pa *pluginAPI) getRemoteScopesIncrementally(connection models.DynamicTabler, groupId string) ([]RemoteScopesTreeNode, errors.Error) {
remoteScopes := make([]RemoteScopesTreeNode, 0)
stream := pa.invoker.Stream("remote-scopes", bridge.DefaultContext, connection.Unwrap(), groupId)
for recv := range stream.Receive() {
if recv.Err != nil {
return nil, recv.Err
}
scope := RemoteScopesTreeNode{}
err := recv.Get(&scope)
if err != nil {
return nil, err
}
remoteScopes = append(remoteScopes, scope)
}
return remoteScopes, nil
}

func (pa *pluginAPI) SearchRemoteScopes(input *plugin.ApiResourceInput) (*plugin.ApiResourceOutput, errors.Error) {
return &plugin.ApiResourceOutput{Status: http.StatusNotImplemented}, nil
}
5 changes: 3 additions & 2 deletions backend/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ limitations under the License.
package main

import (
"log"

"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/runner"
_ "github.com/apache/incubator-devlake/core/version"
"github.com/apache/incubator-devlake/impls/logruslog"
"github.com/apache/incubator-devlake/worker/app"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"log"
)

func main() {
Expand All @@ -38,7 +39,7 @@ func main() {
// establish temporal connection
TASK_QUEUE := basicRes.GetConfig("TEMPORAL_TASK_QUEUE")
// Create the client object just once per process
c, err := errors.Convert01(client.NewClient(client.Options{
c, err := errors.Convert01(client.NewLazyClient(client.Options{
HostPort: basicRes.GetConfig("TEMPORAL_URL"),
Logger: app.NewTemporalLogger(logruslog.Global),
}))
Expand Down

0 comments on commit d52d6c9

Please sign in to comment.