diff --git a/service/jobs/ext_api.go b/service/jobs/ext_api.go index 415ac457..42417150 100644 --- a/service/jobs/ext_api.go +++ b/service/jobs/ext_api.go @@ -1 +1,34 @@ package jobs + +import "context" + +// GetRun retrieves a run based on the provided request. +// It handles pagination if the run contains multiple iterations or tasks. +func (a *JobsAPI) GetRun(ctx context.Context, request GetRunRequest) (*Run, error) { + run, err := a.jobsImpl.GetRun(ctx, request) + if err != nil { + return nil, err + } + + // When querying a Job run, a page token is returned when there are more than 100 tasks. No iterations are defined for a Job run. Therefore, the next page in the response only includes the next page of tasks. + // When querying a ForEach task run, a page token is returned when there are more than 100 iterations. Only a single task is returned, corresponding to the ForEach task itself. Therefore, the client only reads the iterations from the next page and not the tasks. + isPaginatingIterations := run.Iterations != nil && len(run.Iterations) > 0 + + pageToken := run.NextPageToken + for pageToken != "" { + request.PageToken = pageToken + nextRun, err := a.jobsImpl.GetRun(ctx, request) + if err != nil { + return nil, err + } + + if isPaginatingIterations { + run.Iterations = append(run.Iterations, nextRun.Iterations...) + } else { + run.Tasks = append(run.Tasks, nextRun.Tasks...) + } + pageToken = nextRun.NextPageToken + } + + return run, nil +} diff --git a/service/jobs/ext_api_test.go b/service/jobs/ext_api_test.go index 415ac457..e13a437f 100644 --- a/service/jobs/ext_api_test.go +++ b/service/jobs/ext_api_test.go @@ -1 +1,371 @@ package jobs + +import ( + "context" + "testing" + + "github.com/databricks/databricks-sdk-go/qa" + + "github.com/stretchr/testify/assert" +) + +func TestGetRun(t *testing.T) { + ctx := context.Background() + + t.Run("run with no pagination", func(t *testing.T) { + var requestMocks qa.HTTPFixtures = []qa.HTTPFixture{ + { + Method: "GET", + Resource: "/api/2.2/jobs/runs/get?run_id=514594995218126", + Response: Run{ + Iterations: []RunTask{}, + Tasks: []RunTask{ + { + RunId: 123, + TaskKey: "task1", + }, + { + RunId: 1234, + TaskKey: "task2", + }, + }, + NextPageToken: "", + }, + }, + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.1/jobs/runs/get?run_id=514594995218126", + Response: Run{ + Iterations: []RunTask{}, + Tasks: []RunTask{ + { + RunId: 123, + TaskKey: "task1", + }, + { + RunId: 1234, + TaskKey: "task2", + }, + }, + }, + }, + } + client, server := requestMocks.Client(t) + defer server.Close() + + mockJobsImpl := &jobsImpl{ + client: client, + } + api := &JobsAPI{jobsImpl: *mockJobsImpl} + + request := GetRunRequest{RunId: 514594995218126} + run, err := api.GetRun(ctx, request) + + assert.NoError(t, err) + assert.Equal(t, 2, len(run.Tasks)) + assert.Empty(t, run.Iterations) + assert.EqualValues(t, 123, run.Tasks[0].RunId) + assert.EqualValues(t, 1234, run.Tasks[1].RunId) + }) + + t.Run("run with two tasks pages", func(t *testing.T) { + var requestMocks qa.HTTPFixtures = []qa.HTTPFixture{ + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.2/jobs/runs/get?run_id=111222333", + Response: Run{ + Iterations: []RunTask{}, + Tasks: []RunTask{ + { + RunId: 123, + }, + { + RunId: 1234, + }, + }, + JobClusters: []JobCluster{ + { + JobClusterKey: "cluster1", + }, + { + JobClusterKey: "cluster2", + }, + }, + NextPageToken: "token1", + }, + }, + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.2/jobs/runs/get?page_token=token1&run_id=111222333", + Response: Run{ + Iterations: []RunTask{}, + Tasks: []RunTask{ + { + RunId: 222, + }, + { + RunId: 333, + }, + }, + JobClusters: []JobCluster{ + { + JobClusterKey: "cluster1", + }, + { + JobClusterKey: "cluster2", + }, + }, + }, + }, + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.1/jobs/runs/get?run_id=111222333", + Response: Run{ + Iterations: []RunTask{}, + Tasks: []RunTask{ + { + RunId: 123, + }, + { + RunId: 1234, + }, + { + RunId: 222, + }, + { + RunId: 333, + }, + }, + JobClusters: []JobCluster{ + { + JobClusterKey: "cluster1", + }, + { + JobClusterKey: "cluster2", + }, + }, + }, + }, + } + client, server := requestMocks.Client(t) + defer server.Close() + + mockJobsImpl := &jobsImpl{ + client: client, + } + api := &JobsAPI{jobsImpl: *mockJobsImpl} + + request := GetRunRequest{RunId: 111222333} + run, err := api.GetRun(ctx, request) + + assert.NoError(t, err) + assert.Equal(t, 4, len(run.Tasks)) + assert.Empty(t, run.Iterations) + assert.Empty(t, run.NextPageToken) + expected := []RunTask{ + {RunId: 123, ForceSendFields: []string{"RunId", "TaskKey"}}, + {RunId: 1234, ForceSendFields: []string{"RunId", "TaskKey"}}, + {RunId: 222, ForceSendFields: []string{"RunId", "TaskKey"}}, + {RunId: 333, ForceSendFields: []string{"RunId", "TaskKey"}}, + } + assert.Equal(t, expected, run.Tasks) + }) + + t.Run("clusters array is not increased when paginated", func(t *testing.T) { + var requestMocks qa.HTTPFixtures = []qa.HTTPFixture{ + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.2/jobs/runs/get?run_id=111222333", + Response: Run{ + Iterations: []RunTask{}, + Tasks: []RunTask{ + { + RunId: 123, + }, + { + RunId: 1234, + }, + }, + JobClusters: []JobCluster{ + { + JobClusterKey: "cluster1", + }, + { + JobClusterKey: "cluster2", + }, + }, + NextPageToken: "token1", + }, + }, + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.2/jobs/runs/get?page_token=token1&run_id=111222333", + Response: Run{ + Iterations: []RunTask{}, + Tasks: []RunTask{ + { + RunId: 222, + }, + { + RunId: 333, + }, + }, + JobClusters: []JobCluster{ + { + JobClusterKey: "cluster1", + }, + { + JobClusterKey: "cluster2", + }, + }, + }, + }, + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.1/jobs/runs/get?run_id=111222333", + Response: Run{ + Iterations: []RunTask{}, + Tasks: []RunTask{ + { + RunId: 123, + }, + { + RunId: 1234, + }, + { + RunId: 222, + }, + { + RunId: 333, + }, + }, + JobClusters: []JobCluster{ + { + JobClusterKey: "cluster1", + }, + { + JobClusterKey: "cluster2", + }, + }, + }, + }, + } + client, server := requestMocks.Client(t) + defer server.Close() + + mockJobsImpl := &jobsImpl{ + client: client, + } + api := &JobsAPI{jobsImpl: *mockJobsImpl} + + request := GetRunRequest{RunId: 111222333} + run, err := api.GetRun(ctx, request) + + assert.NoError(t, err) + assert.Equal(t, 2, len(run.JobClusters)) + assert.Equal(t, "cluster1", run.JobClusters[0].JobClusterKey) + assert.Equal(t, "cluster2", run.JobClusters[1].JobClusterKey) + }) + + t.Run("run with two iterations pages", func(t *testing.T) { + var requestMocks qa.HTTPFixtures = []qa.HTTPFixture{ + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.2/jobs/runs/get?run_id=4444", + Response: Run{ + Iterations: []RunTask{ + { + RunId: 123, + }, + { + RunId: 1234, + }, + }, + Tasks: []RunTask{ + { + RunId: 999, + }, + }, + NextPageToken: "token1", + }, + }, + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.2/jobs/runs/get?page_token=token1&run_id=4444", + Response: Run{ + Iterations: []RunTask{ + { + RunId: 222, + }, + { + RunId: 333, + }, + }, + Tasks: []RunTask{ + { + RunId: 999, + }, + }, + }, + }, + { + Method: "GET", + ReuseRequest: true, + Resource: "/api/2.1/jobs/runs/get?run_id=4444", + Response: Run{ + Iterations: []RunTask{ + { + RunId: 123, + }, + { + RunId: 1234, + }, + { + RunId: 222, + }, + { + RunId: 333, + }, + }, + Tasks: []RunTask{ + { + RunId: 999, + }, + }, + }, + }, + } + client, server := requestMocks.Client(t) + defer server.Close() + + mockJobsImpl := &jobsImpl{ + client: client, + } + api := &JobsAPI{jobsImpl: *mockJobsImpl} + + request := GetRunRequest{RunId: 4444} + run, err := api.GetRun(ctx, request) + + assert.NoError(t, err) + assert.Equal(t, 4, len(run.Iterations)) + assert.Equal(t, 1, len(run.Tasks)) + assert.Empty(t, run.NextPageToken) + expected := []RunTask{ + {RunId: 123, ForceSendFields: []string{"RunId", "TaskKey"}}, + {RunId: 1234, ForceSendFields: []string{"RunId", "TaskKey"}}, + {RunId: 222, ForceSendFields: []string{"RunId", "TaskKey"}}, + {RunId: 333, ForceSendFields: []string{"RunId", "TaskKey"}}, + } + assert.Equal(t, expected, run.Iterations) + assert.EqualValues(t, 999, run.Tasks[0].RunId) + }) +}