Skip to content

Commit

Permalink
Kamu 168 set transform add event from UI (#106)
Browse files Browse the repository at this point in the history
* Add SetTransformComponent.
* Add search and fix qraphql schema.
* Add material-tree module.
* Add schema for input datasets.
* Add templates for the queries.
* Add the ability to edit queries and to save event.
* Add navigation for input datasets.
* Add unit tests.
* Add owner name to the input datasets tree node.
  • Loading branch information
dmitriy-borzenko authored Jul 6, 2023
1 parent bdc45ff commit 75dbd5a
Show file tree
Hide file tree
Showing 64 changed files with 2,444 additions and 158 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Merge step
- The ability to edit SetPollingSource event
- Editing SetWaterMark event
- Editing SetTransform event

## [0.6.0] - 2023-02-27

Expand Down
200 changes: 189 additions & 11 deletions resources/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ type Accounts {

type AddData {
inputCheckpoint: Multihash
outputData: DataSlice!
outputData: DataSlice
outputCheckpoint: Checkpoint
outputWatermark: DateTime
sourceState: SourceState
}

type AttachmentEmbedded {
Expand Down Expand Up @@ -135,6 +136,10 @@ type DataQueries {
Executes a specified query and returns its result
"""
query(query: String!, queryDialect: QueryDialect!, dataFormat: DataBatchFormat, schemaFormat: DataSchemaFormat, limit: Int): DataQueryResult!
"""
Lists engines known to the system and recommended for use
"""
knownEngines: [EngineDesc!]!
}

union DataQueryResult = DataQueryResultSuccess | DataQueryResultError
Expand Down Expand Up @@ -179,7 +184,8 @@ type Dataset {
id: DatasetID!
"""
Symbolic name of the dataset.
Name can change over the dataset's lifetime. For unique identifier use `id()`.
Name can change over the dataset's lifetime. For unique identifier use
`id()`.
"""
name: DatasetName!
"""
Expand Down Expand Up @@ -230,12 +236,14 @@ type DatasetData {
"""
numRecordsTotal: Int!
"""
An estimated size of data on disk not accounting for replication or caching
An estimated size of data on disk not accounting for replication or
caching
"""
estimatedSize: Int!
"""
Returns the specified number of the latest records in the dataset
This is equivalent to the SQL query: `SELECT * FROM dataset ORDER BY event_time DESC LIMIT N`
This is equivalent to the SQL query: `SELECT * FROM dataset ORDER BY
event_time DESC LIMIT N`
"""
tail(limit: Int, dataFormat: DataBatchFormat, schemaFormat: DataSchemaFormat): DataQueryResult!
}
Expand Down Expand Up @@ -285,7 +293,8 @@ type DatasetMetadata {
"""
currentInfo: SetInfo!
"""
Current readme file as discovered from attachments associated with the dataset
Current readme file as discovered from attachments associated with the
dataset
"""
currentReadme: String
"""
Expand Down Expand Up @@ -334,6 +343,27 @@ The input/output is a string in RFC3339 format.
"""
scalar DateTime

"""
Describes
"""
type EngineDesc {
"""
A short name of the engine, e.g. "Spark", "Flink".
Intended for use in UI for quick engine identification and selection.
"""
name: String!
"""
Language and dialect this engine is using for queries
Indended for configuring code highlighting and completions.
"""
dialect: QueryDialect!
"""
OCI image repository and a tag of the latest engine image, e.g.
"ghcr.io/kamu-data/engine-datafusion:0.1.2"
"""
latestImage: String!
}

type EnvVar {
name: String!
value: String
Expand Down Expand Up @@ -453,7 +483,8 @@ type MetadataChain {
"""
blockByHash(hash: Multihash!): MetadataBlockExtended
"""
Returns a metadata block corresponding to the specified hash and encoded in desired format
Returns a metadata block corresponding to the specified hash and encoded
in desired format
"""
blockByHashEncoded(hash: Multihash!, format: MetadataManifestFormat!): String
"""
Expand Down Expand Up @@ -484,6 +515,7 @@ scalar Multihash

type Mutation {
auth: Auth!
tasks: TasksMutations!
}

type OffsetInterval {
Expand Down Expand Up @@ -516,7 +548,8 @@ type PageBasedInfo {
"""
currentPage: Int!
"""
Approximate number of total pages assuming number of nodes per page stays the same
Approximate number of total pages assuming number of nodes per page
stays the same
"""
totalPages: Int
}
Expand All @@ -538,15 +571,29 @@ type Query {
"""
apiVersion: String!
"""
Dataset-related functionality group
Dataset-related functionality group.
Datasets are historical streams of events recorded under a cetrain
schema.
"""
datasets: Datasets!
"""
Account-related functionality group
Account-related functionality group.
Accounts can be individual users or organizations registered in the
system. This groups deals with their identities and permissions.
"""
accounts: Accounts!
"""
Search-related functionality group
Task-related functionality group.
Tasks are units of scheduling that can perform many functions like
ingesting new data, running dataset transformations, answering ad-hoc
queries etc.
"""
tasks: Tasks!
"""
Search-related functionality group.
"""
search: Search!
"""
Expand All @@ -556,7 +603,9 @@ type Query {
}

enum QueryDialect {
DATA_FUSION
SQL_SPARK
SQL_FLINK
SQL_DATA_FUSION
}

union ReadStep = ReadStepCsv | ReadStepJsonLines | ReadStepGeoJson | ReadStepEsriShapefile | ReadStepParquet
Expand Down Expand Up @@ -694,12 +743,141 @@ enum SourceOrdering {
BY_NAME
}

type SourceState {
kind: String!
source: String!
value: String!
}

type SqlQueryStep {
alias: String
query: String!
}


type Task {
"""
Unique and stable identitfier of this task
"""
taskId: TaskID!
"""
Life-cycle status of a task
"""
status: TaskStatus!
"""
Whether the task was ordered to be cancelled
"""
cancellationRequested: Boolean!
"""
Describes a certain final outcome of the task once it reaches the
"finished" status
"""
outcome: TaskOutcome
"""
Time when task was originally created and placed in a queue
"""
createdAt: DateTime!
"""
Time when task transitioned into a running state
"""
ranAt: DateTime
"""
Time when cancellation of task was requested
"""
cancellationRequestedAt: DateTime
"""
Time when task has reached a final outcome
"""
finishedAt: DateTime
}

type TaskConnection {
"""
A shorthand for `edges { node { ... } }`
"""
nodes: [Task!]!
"""
Approximate number of total nodes
"""
totalCount: Int!
"""
Page information
"""
pageInfo: PageBasedInfo!
edges: [TaskEdge!]!
}

type TaskEdge {
node: Task!
}

scalar TaskID

"""
Describes a certain final outcome of the task
"""
enum TaskOutcome {
"""
Task succeeded
"""
SUCCESS
"""
Task failed to complete
"""
FAILED
"""
Task was cancelled by a user
"""
CANCELLED
}

"""
Life-cycle status of a task
"""
enum TaskStatus {
"""
Task is waiting for capacity to be allocated to it
"""
QUEUED
"""
Task is being executed
"""
RUNNING
"""
Task has reached a certain final outcome (see [TaskOutcome])
"""
FINISHED
}

type Tasks {
"""
Returns current state of a given task
"""
getTask(taskId: TaskID!): Task
"""
Returns states of tasks associated with a given dataset ordered by
creation time from newest to oldest
"""
listTasksByDataset(datasetId: DatasetID!, page: Int, perPage: Int): TaskConnection!
}

type TasksMutations {
"""
Requests cancellation of the specified task
"""
cancelTask(taskId: TaskID!): Task!
"""
Schedules a task to update the specified dataset by performing polling
ingest or a derivative transformation
"""
createUpdateDatasetTask(datasetId: DatasetID!): Task!
"""
Schedules a task to update the specified dataset by performing polling
ingest or a derivative transformation
"""
createProbeTask(datasetId: DatasetID, busyTimeMs: Int, endWithOutcome: TaskOutcome): Task!
}

type TemporalTable {
name: String!
primaryKey: [String!]!
Expand Down
18 changes: 18 additions & 0 deletions src/app/api/dataset.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {
CreateDatasetFromSnapshotQuery,
CreateEmptyDatasetQuery,
DatasetKind,
GetDatasetSchemaGQL,
GetDatasetSchemaQuery,
} from "src/app/api/kamu.graphql.interface";
import AppValues from "src/app/common/app.values";
import { ApolloQueryResult } from "@apollo/client/core";
Expand Down Expand Up @@ -41,6 +43,7 @@ export class DatasetApi {
private createEmptyDatasetGQL: CreateEmptyDatasetGQL,
private createDatasetFromSnapshotGQL: CreateDatasetFromSnapshotGQL,
private commitEventToDataset: CommitEventToDatasetGQL,
private datasetSchemaGQL: GetDatasetSchemaGQL,
) {}

public getDatasetMainData(params: {
Expand Down Expand Up @@ -97,6 +100,21 @@ export class DatasetApi {
);
}

public getDatasetSchema(
datasetId: string,
): Observable<GetDatasetSchemaQuery> {
return this.datasetSchemaGQL
.watch({
datasetId,
})
.valueChanges.pipe(
first(),
map((result: ApolloQueryResult<GetDatasetSchemaQuery>) => {
return result.data;
}),
);
}

public fetchDatasetsByAccountName(
accountName: string,
page = 0,
Expand Down
20 changes: 20 additions & 0 deletions src/app/api/engine.api.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { TestBed } from "@angular/core/testing";
import { ApolloTestingModule } from "apollo-angular/testing";
import { EngineApi } from "./engine.api";
import { Apollo, ApolloModule } from "apollo-angular";

describe("EngineApi", () => {
let service: EngineApi;

beforeEach(() => {
TestBed.configureTestingModule({
providers: [EngineApi, Apollo],
imports: [ApolloModule, ApolloTestingModule],
});
service = TestBed.inject(EngineApi);
});

it("should be created", () => {
expect(service).toBeTruthy();
});
});
21 changes: 21 additions & 0 deletions src/app/api/engine.api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Injectable } from "@angular/core";
import { EnginesGQL, EnginesQuery } from "./kamu.graphql.interface";
import { ApolloQueryResult } from "@apollo/client";
import { Observable } from "rxjs";
import { first, map } from "rxjs/operators";

@Injectable({
providedIn: "root",
})
export class EngineApi {
constructor(private enginesGQL: EnginesGQL) {}

public getEngines(): Observable<EnginesQuery> {
return this.enginesGQL.watch().valueChanges.pipe(
first(),
map((result: ApolloQueryResult<EnginesQuery>) => {
return result.data;
}),
);
}
}
Loading

0 comments on commit 75dbd5a

Please sign in to comment.