diff --git a/CHANGELOG.md b/CHANGELOG.md index 15675a418..f5ad3bf04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Merge step - The ability to edit SetPollingSource event - Editing SetWaterMark event +- Editing SetTransform event ## [0.6.0] - 2023-02-27 diff --git a/resources/schema.graphql b/resources/schema.graphql index f10d42011..fe6d304d0 100644 --- a/resources/schema.graphql +++ b/resources/schema.graphql @@ -34,9 +34,10 @@ type Accounts { type AddData { inputCheckpoint: Multihash - outputData: DataSlice! + outputData: DataSlice outputCheckpoint: Checkpoint outputWatermark: DateTime + sourceState: SourceState } type AttachmentEmbedded { @@ -135,6 +136,10 @@ type DataQueries { Executes a specified query and returns its result """ query(query: String!, queryDialect: QueryDialect!, dataFormat: DataBatchFormat, schemaFormat: DataSchemaFormat, limit: Int): DataQueryResult! + """ + Lists engines known to the system and recommended for use + """ + knownEngines: [EngineDesc!]! } union DataQueryResult = DataQueryResultSuccess | DataQueryResultError @@ -179,7 +184,8 @@ type Dataset { id: DatasetID! """ Symbolic name of the dataset. - Name can change over the dataset's lifetime. For unique identifier use `id()`. + Name can change over the dataset's lifetime. For unique identifier use + `id()`. """ name: DatasetName! """ @@ -230,12 +236,14 @@ type DatasetData { """ numRecordsTotal: Int! """ - An estimated size of data on disk not accounting for replication or caching + An estimated size of data on disk not accounting for replication or + caching """ estimatedSize: Int! """ Returns the specified number of the latest records in the dataset - This is equivalent to the SQL query: `SELECT * FROM dataset ORDER BY event_time DESC LIMIT N` + This is equivalent to the SQL query: `SELECT * FROM dataset ORDER BY + event_time DESC LIMIT N` """ tail(limit: Int, dataFormat: DataBatchFormat, schemaFormat: DataSchemaFormat): DataQueryResult! } @@ -285,7 +293,8 @@ type DatasetMetadata { """ currentInfo: SetInfo! """ - Current readme file as discovered from attachments associated with the dataset + Current readme file as discovered from attachments associated with the + dataset """ currentReadme: String """ @@ -334,6 +343,27 @@ The input/output is a string in RFC3339 format. """ scalar DateTime +""" +Describes +""" +type EngineDesc { + """ + A short name of the engine, e.g. "Spark", "Flink". + Intended for use in UI for quick engine identification and selection. + """ + name: String! + """ + Language and dialect this engine is using for queries + Indended for configuring code highlighting and completions. + """ + dialect: QueryDialect! + """ + OCI image repository and a tag of the latest engine image, e.g. + "ghcr.io/kamu-data/engine-datafusion:0.1.2" + """ + latestImage: String! +} + type EnvVar { name: String! value: String @@ -453,7 +483,8 @@ type MetadataChain { """ blockByHash(hash: Multihash!): MetadataBlockExtended """ - Returns a metadata block corresponding to the specified hash and encoded in desired format + Returns a metadata block corresponding to the specified hash and encoded + in desired format """ blockByHashEncoded(hash: Multihash!, format: MetadataManifestFormat!): String """ @@ -484,6 +515,7 @@ scalar Multihash type Mutation { auth: Auth! + tasks: TasksMutations! } type OffsetInterval { @@ -516,7 +548,8 @@ type PageBasedInfo { """ currentPage: Int! """ - Approximate number of total pages assuming number of nodes per page stays the same + Approximate number of total pages assuming number of nodes per page + stays the same """ totalPages: Int } @@ -538,15 +571,29 @@ type Query { """ apiVersion: String! """ - Dataset-related functionality group + Dataset-related functionality group. + + Datasets are historical streams of events recorded under a cetrain + schema. """ datasets: Datasets! """ - Account-related functionality group + Account-related functionality group. + + Accounts can be individual users or organizations registered in the + system. This groups deals with their identities and permissions. """ accounts: Accounts! """ - Search-related functionality group + Task-related functionality group. + + Tasks are units of scheduling that can perform many functions like + ingesting new data, running dataset transformations, answering ad-hoc + queries etc. + """ + tasks: Tasks! + """ + Search-related functionality group. """ search: Search! """ @@ -556,7 +603,9 @@ type Query { } enum QueryDialect { - DATA_FUSION + SQL_SPARK + SQL_FLINK + SQL_DATA_FUSION } union ReadStep = ReadStepCsv | ReadStepJsonLines | ReadStepGeoJson | ReadStepEsriShapefile | ReadStepParquet @@ -694,12 +743,141 @@ enum SourceOrdering { BY_NAME } +type SourceState { + kind: String! + source: String! + value: String! +} + type SqlQueryStep { alias: String query: String! } +type Task { + """ + Unique and stable identitfier of this task + """ + taskId: TaskID! + """ + Life-cycle status of a task + """ + status: TaskStatus! + """ + Whether the task was ordered to be cancelled + """ + cancellationRequested: Boolean! + """ + Describes a certain final outcome of the task once it reaches the + "finished" status + """ + outcome: TaskOutcome + """ + Time when task was originally created and placed in a queue + """ + createdAt: DateTime! + """ + Time when task transitioned into a running state + """ + ranAt: DateTime + """ + Time when cancellation of task was requested + """ + cancellationRequestedAt: DateTime + """ + Time when task has reached a final outcome + """ + finishedAt: DateTime +} + +type TaskConnection { + """ + A shorthand for `edges { node { ... } }` + """ + nodes: [Task!]! + """ + Approximate number of total nodes + """ + totalCount: Int! + """ + Page information + """ + pageInfo: PageBasedInfo! + edges: [TaskEdge!]! +} + +type TaskEdge { + node: Task! +} + +scalar TaskID + +""" +Describes a certain final outcome of the task +""" +enum TaskOutcome { + """ + Task succeeded + """ + SUCCESS + """ + Task failed to complete + """ + FAILED + """ + Task was cancelled by a user + """ + CANCELLED +} + +""" +Life-cycle status of a task +""" +enum TaskStatus { + """ + Task is waiting for capacity to be allocated to it + """ + QUEUED + """ + Task is being executed + """ + RUNNING + """ + Task has reached a certain final outcome (see [TaskOutcome]) + """ + FINISHED +} + +type Tasks { + """ + Returns current state of a given task + """ + getTask(taskId: TaskID!): Task + """ + Returns states of tasks associated with a given dataset ordered by + creation time from newest to oldest + """ + listTasksByDataset(datasetId: DatasetID!, page: Int, perPage: Int): TaskConnection! +} + +type TasksMutations { + """ + Requests cancellation of the specified task + """ + cancelTask(taskId: TaskID!): Task! + """ + Schedules a task to update the specified dataset by performing polling + ingest or a derivative transformation + """ + createUpdateDatasetTask(datasetId: DatasetID!): Task! + """ + Schedules a task to update the specified dataset by performing polling + ingest or a derivative transformation + """ + createProbeTask(datasetId: DatasetID, busyTimeMs: Int, endWithOutcome: TaskOutcome): Task! +} + type TemporalTable { name: String! primaryKey: [String!]! diff --git a/src/app/api/dataset.api.ts b/src/app/api/dataset.api.ts index f951cdfaf..8e67e7704 100644 --- a/src/app/api/dataset.api.ts +++ b/src/app/api/dataset.api.ts @@ -5,6 +5,8 @@ import { CreateDatasetFromSnapshotQuery, CreateEmptyDatasetQuery, DatasetKind, + GetDatasetSchemaGQL, + GetDatasetSchemaQuery, } from "src/app/api/kamu.graphql.interface"; import AppValues from "src/app/common/app.values"; import { ApolloQueryResult } from "@apollo/client/core"; @@ -41,6 +43,7 @@ export class DatasetApi { private createEmptyDatasetGQL: CreateEmptyDatasetGQL, private createDatasetFromSnapshotGQL: CreateDatasetFromSnapshotGQL, private commitEventToDataset: CommitEventToDatasetGQL, + private datasetSchemaGQL: GetDatasetSchemaGQL, ) {} public getDatasetMainData(params: { @@ -97,6 +100,21 @@ export class DatasetApi { ); } + public getDatasetSchema( + datasetId: string, + ): Observable { + return this.datasetSchemaGQL + .watch({ + datasetId, + }) + .valueChanges.pipe( + first(), + map((result: ApolloQueryResult) => { + return result.data; + }), + ); + } + public fetchDatasetsByAccountName( accountName: string, page = 0, diff --git a/src/app/api/engine.api.spec.ts b/src/app/api/engine.api.spec.ts new file mode 100644 index 000000000..304a9a827 --- /dev/null +++ b/src/app/api/engine.api.spec.ts @@ -0,0 +1,20 @@ +import { TestBed } from "@angular/core/testing"; +import { ApolloTestingModule } from "apollo-angular/testing"; +import { EngineApi } from "./engine.api"; +import { Apollo, ApolloModule } from "apollo-angular"; + +describe("EngineApi", () => { + let service: EngineApi; + + beforeEach(() => { + TestBed.configureTestingModule({ + providers: [EngineApi, Apollo], + imports: [ApolloModule, ApolloTestingModule], + }); + service = TestBed.inject(EngineApi); + }); + + it("should be created", () => { + expect(service).toBeTruthy(); + }); +}); diff --git a/src/app/api/engine.api.ts b/src/app/api/engine.api.ts new file mode 100644 index 000000000..6efc13454 --- /dev/null +++ b/src/app/api/engine.api.ts @@ -0,0 +1,21 @@ +import { Injectable } from "@angular/core"; +import { EnginesGQL, EnginesQuery } from "./kamu.graphql.interface"; +import { ApolloQueryResult } from "@apollo/client"; +import { Observable } from "rxjs"; +import { first, map } from "rxjs/operators"; + +@Injectable({ + providedIn: "root", +}) +export class EngineApi { + constructor(private enginesGQL: EnginesGQL) {} + + public getEngines(): Observable { + return this.enginesGQL.watch().valueChanges.pipe( + first(), + map((result: ApolloQueryResult) => { + return result.data; + }), + ); + } +} diff --git a/src/app/api/gql/dataset-data-sql-run.graphql b/src/app/api/gql/dataset-data-sql-run.graphql index 793381b29..5a6fae2e8 100644 --- a/src/app/api/gql/dataset-data-sql-run.graphql +++ b/src/app/api/gql/dataset-data-sql-run.graphql @@ -2,7 +2,7 @@ query getDatasetDataSQLRun($query: String!, $limit: Int!) { data { query( query: $query - queryDialect: DATA_FUSION + queryDialect: SQL_DATA_FUSION schemaFormat: PARQUET_JSON dataFormat: JSON limit: $limit diff --git a/src/app/api/gql/dataset-schema.graphql b/src/app/api/gql/dataset-schema.graphql new file mode 100644 index 000000000..1e75494f8 --- /dev/null +++ b/src/app/api/gql/dataset-schema.graphql @@ -0,0 +1,13 @@ +query getDatasetSchema($datasetId: DatasetID!) { + datasets { + byId(datasetId: $datasetId) { + ...DatasetBasics + metadata { + currentSchema(format: PARQUET_JSON) { + format + content + } + } + } + } +} diff --git a/src/app/api/gql/engines.graphql b/src/app/api/gql/engines.graphql new file mode 100644 index 000000000..447c9ad36 --- /dev/null +++ b/src/app/api/gql/engines.graphql @@ -0,0 +1,9 @@ +query engines { + data { + knownEngines { + name + dialect + latestImage + } + } +} diff --git a/src/app/api/gql/fragments/fragment-dataset-overview.graphql b/src/app/api/gql/fragments/fragment-dataset-overview.graphql index 74f7980c5..c70b0b217 100644 --- a/src/app/api/gql/fragments/fragment-dataset-overview.graphql +++ b/src/app/api/gql/fragments/fragment-dataset-overview.graphql @@ -7,5 +7,8 @@ fragment DatasetOverview on Dataset { currentSource { __typename } + currentTransform { + __typename + } } } diff --git a/src/app/api/kamu.graphql.interface.ts b/src/app/api/kamu.graphql.interface.ts index d61c89f48..c8293fd5a 100644 --- a/src/app/api/kamu.graphql.interface.ts +++ b/src/app/api/kamu.graphql.interface.ts @@ -31,6 +31,7 @@ export type Scalars = { */ DateTime: any; Multihash: any; + TaskID: any; }; export type AccessToken = { @@ -74,8 +75,9 @@ export type AddData = { __typename?: "AddData"; inputCheckpoint?: Maybe; outputCheckpoint?: Maybe; - outputData: DataSlice; + outputData?: Maybe; outputWatermark?: Maybe; + sourceState?: Maybe; }; export type AttachmentEmbedded = { @@ -195,6 +197,8 @@ export enum DataBatchFormat { export type DataQueries = { __typename?: "DataQueries"; + /** Lists engines known to the system and recommended for use */ + knownEngines: Array; /** Executes a specified query and returns its result */ query: DataQueryResult; }; @@ -262,7 +266,8 @@ export type Dataset = { metadata: DatasetMetadata; /** * Symbolic name of the dataset. - * Name can change over the dataset's lifetime. For unique identifier use `id()`. + * Name can change over the dataset's lifetime. For unique identifier use + * `id()`. */ name: Scalars["DatasetName"]; /** Returns the user or organization that owns this dataset */ @@ -282,13 +287,17 @@ export type DatasetConnection = { export type DatasetData = { __typename?: "DatasetData"; - /** An estimated size of data on disk not accounting for replication or caching */ + /** + * An estimated size of data on disk not accounting for replication or + * caching + */ estimatedSize: Scalars["Int"]; /** Total number of records in this dataset */ numRecordsTotal: Scalars["Int"]; /** * Returns the specified number of the latest records in the dataset - * This is equivalent to the SQL query: `SELECT * FROM dataset ORDER BY event_time DESC LIMIT N` + * This is equivalent to the SQL query: `SELECT * FROM dataset ORDER BY + * event_time DESC LIMIT N` */ tail: DataQueryResult; }; @@ -319,7 +328,10 @@ export type DatasetMetadata = { currentInfo: SetInfo; /** Current license associated with the dataset */ currentLicense?: Maybe; - /** Current readme file as discovered from attachments associated with the dataset */ + /** + * Current readme file as discovered from attachments associated with the + * dataset + */ currentReadme?: Maybe; /** Latest data schema */ currentSchema?: Maybe; @@ -388,6 +400,26 @@ export type DatasetsCreateFromSnapshotArgs = { snapshotFormat: MetadataManifestFormat; }; +/** Describes */ +export type EngineDesc = { + __typename?: "EngineDesc"; + /** + * Language and dialect this engine is using for queries + * Indended for configuring code highlighting and completions. + */ + dialect: QueryDialect; + /** + * OCI image repository and a tag of the latest engine image, e.g. + * "ghcr.io/kamu-data/engine-datafusion:0.1.2" + */ + latestImage: Scalars["String"]; + /** + * A short name of the engine, e.g. "Spark", "Flink". + * Intended for use in UI for quick engine identification and selection. + */ + name: Scalars["String"]; +}; + export type EnvVar = { __typename?: "EnvVar"; name: Scalars["String"]; @@ -512,7 +544,10 @@ export type MetadataChain = { __typename?: "MetadataChain"; /** Returns a metadata block corresponding to the specified hash */ blockByHash?: Maybe; - /** Returns a metadata block corresponding to the specified hash and encoded in desired format */ + /** + * Returns a metadata block corresponding to the specified hash and encoded + * in desired format + */ blockByHashEncoded?: Maybe; /** Iterates all metadata blocks in the reverse chronological order */ blocks: MetadataBlockConnection; @@ -572,6 +607,7 @@ export type MetadataManifestUnsupportedVersion = CommitResult & export type Mutation = { __typename?: "Mutation"; auth: Auth; + tasks: TasksMutations; }; export type OffsetInterval = { @@ -596,7 +632,10 @@ export type PageBasedInfo = { hasNextPage: Scalars["Boolean"]; /** When paginating backwards, are there more items? */ hasPreviousPage: Scalars["Boolean"]; - /** Approximate number of total pages assuming number of nodes per page stays the same */ + /** + * Approximate number of total pages assuming number of nodes per page + * stays the same + */ totalPages?: Maybe; }; @@ -615,20 +654,40 @@ export type PrepStepPipe = { export type Query = { __typename?: "Query"; - /** Account-related functionality group */ + /** + * Account-related functionality group. + * + * Accounts can be individual users or organizations registered in the + * system. This groups deals with their identities and permissions. + */ accounts: Accounts; /** Returns the version of the GQL API */ apiVersion: Scalars["String"]; /** Querying and data manipulations */ data: DataQueries; - /** Dataset-related functionality group */ + /** + * Dataset-related functionality group. + * + * Datasets are historical streams of events recorded under a cetrain + * schema. + */ datasets: Datasets; - /** Search-related functionality group */ + /** Search-related functionality group. */ search: Search; + /** + * Task-related functionality group. + * + * Tasks are units of scheduling that can perform many functions like + * ingesting new data, running dataset transformations, answering ad-hoc + * queries etc. + */ + tasks: Tasks; }; export enum QueryDialect { - DataFusion = "DATA_FUSION", + SqlDataFusion = "SQL_DATA_FUSION", + SqlFlink = "SQL_FLINK", + SqlSpark = "SQL_SPARK", } export type ReadStep = @@ -787,12 +846,129 @@ export enum SourceOrdering { ByName = "BY_NAME", } +export type SourceState = { + __typename?: "SourceState"; + kind: Scalars["String"]; + source: Scalars["String"]; + value: Scalars["String"]; +}; + export type SqlQueryStep = { __typename?: "SqlQueryStep"; alias?: Maybe; query: Scalars["String"]; }; +export type Task = { + __typename?: "Task"; + /** Whether the task was ordered to be cancelled */ + cancellationRequested: Scalars["Boolean"]; + /** Time when cancellation of task was requested */ + cancellationRequestedAt?: Maybe; + /** Time when task was originally created and placed in a queue */ + createdAt: Scalars["DateTime"]; + /** Time when task has reached a final outcome */ + finishedAt?: Maybe; + /** + * Describes a certain final outcome of the task once it reaches the + * "finished" status + */ + outcome?: Maybe; + /** Time when task transitioned into a running state */ + ranAt?: Maybe; + /** Life-cycle status of a task */ + status: TaskStatus; + /** Unique and stable identitfier of this task */ + taskId: Scalars["TaskID"]; +}; + +export type TaskConnection = { + __typename?: "TaskConnection"; + edges: Array; + /** A shorthand for `edges { node { ... } }` */ + nodes: Array; + /** Page information */ + pageInfo: PageBasedInfo; + /** Approximate number of total nodes */ + totalCount: Scalars["Int"]; +}; + +export type TaskEdge = { + __typename?: "TaskEdge"; + node: Task; +}; + +/** Describes a certain final outcome of the task */ +export enum TaskOutcome { + /** Task was cancelled by a user */ + Cancelled = "CANCELLED", + /** Task failed to complete */ + Failed = "FAILED", + /** Task succeeded */ + Success = "SUCCESS", +} + +/** Life-cycle status of a task */ +export enum TaskStatus { + /** Task has reached a certain final outcome (see [TaskOutcome]) */ + Finished = "FINISHED", + /** Task is waiting for capacity to be allocated to it */ + Queued = "QUEUED", + /** Task is being executed */ + Running = "RUNNING", +} + +export type Tasks = { + __typename?: "Tasks"; + /** Returns current state of a given task */ + getTask?: Maybe; + /** + * Returns states of tasks associated with a given dataset ordered by + * creation time from newest to oldest + */ + listTasksByDataset: TaskConnection; +}; + +export type TasksGetTaskArgs = { + taskId: Scalars["TaskID"]; +}; + +export type TasksListTasksByDatasetArgs = { + datasetId: Scalars["DatasetID"]; + page?: InputMaybe; + perPage?: InputMaybe; +}; + +export type TasksMutations = { + __typename?: "TasksMutations"; + /** Requests cancellation of the specified task */ + cancelTask: Task; + /** + * Schedules a task to update the specified dataset by performing polling + * ingest or a derivative transformation + */ + createProbeTask: Task; + /** + * Schedules a task to update the specified dataset by performing polling + * ingest or a derivative transformation + */ + createUpdateDatasetTask: Task; +}; + +export type TasksMutationsCancelTaskArgs = { + taskId: Scalars["TaskID"]; +}; + +export type TasksMutationsCreateProbeTaskArgs = { + busyTimeMs?: InputMaybe; + datasetId?: InputMaybe; + endWithOutcome?: InputMaybe; +}; + +export type TasksMutationsCreateUpdateDatasetTaskArgs = { + datasetId: Scalars["DatasetID"]; +}; + export type TemporalTable = { __typename?: "TemporalTable"; name: Scalars["String"]; @@ -1007,6 +1183,30 @@ export type GetDatasetMainDataQuery = { }; }; +export type GetDatasetSchemaQueryVariables = Exact<{ + datasetId: Scalars["DatasetID"]; +}>; + +export type GetDatasetSchemaQuery = { + __typename?: "Query"; + datasets: { + __typename?: "Datasets"; + byId?: + | ({ + __typename?: "Dataset"; + metadata: { + __typename?: "DatasetMetadata"; + currentSchema?: { + __typename?: "DataSchema"; + format: DataSchemaFormat; + content: string; + } | null; + }; + } & DatasetBasicsFragment) + | null; + }; +}; + export type DatasetsByAccountNameQueryVariables = Exact<{ accountName: Scalars["AccountName"]; perPage?: InputMaybe; @@ -1030,17 +1230,32 @@ export type DatasetsByAccountNameQuery = { }; }; +export type EnginesQueryVariables = Exact<{ [key: string]: never }>; + +export type EnginesQuery = { + __typename?: "Query"; + data: { + __typename?: "DataQueries"; + knownEngines: Array<{ + __typename?: "EngineDesc"; + name: string; + dialect: QueryDialect; + latestImage: string; + }>; + }; +}; + export type AddDataEventFragment = { __typename?: "AddData"; inputCheckpoint?: any | null; addDataWatermark?: any | null; - outputData: { + outputData?: { __typename?: "DataSlice"; logicalHash: any; physicalHash: any; size: number; interval: { __typename?: "OffsetInterval"; start: number; end: number }; - }; + } | null; outputCheckpoint?: { __typename?: "Checkpoint"; physicalHash: any; @@ -1468,6 +1683,7 @@ export type DatasetOverviewFragment = { metadata: { __typename?: "DatasetMetadata"; currentSource?: { __typename: "SetPollingSource" } | null; + currentTransform?: { __typename: "SetTransform" } | null; }; } & DatasetDescriptionFragment & DatasetDetailsFragment & @@ -2187,6 +2403,9 @@ export const DatasetOverviewFragmentDoc = gql` currentSource { __typename } + currentTransform { + __typename + } } } ${DatasetDescriptionFragmentDoc} @@ -2359,7 +2578,7 @@ export const GetDatasetDataSqlRunDocument = gql` data { query( query: $query - queryDialect: DATA_FUSION + queryDialect: SQL_DATA_FUSION schemaFormat: PARQUET_JSON dataFormat: JSON limit: $limit @@ -2478,6 +2697,36 @@ export class GetDatasetMainDataGQL extends Apollo.Query< super(apollo); } } +export const GetDatasetSchemaDocument = gql` + query getDatasetSchema($datasetId: DatasetID!) { + datasets { + byId(datasetId: $datasetId) { + ...DatasetBasics + metadata { + currentSchema(format: PARQUET_JSON) { + format + content + } + } + } + } + } + ${DatasetBasicsFragmentDoc} +`; + +@Injectable({ + providedIn: "root", +}) +export class GetDatasetSchemaGQL extends Apollo.Query< + GetDatasetSchemaQuery, + GetDatasetSchemaQueryVariables +> { + document = GetDatasetSchemaDocument; + + constructor(apollo: Apollo.Apollo) { + super(apollo); + } +} export const DatasetsByAccountNameDocument = gql` query datasetsByAccountName( $accountName: AccountName! @@ -2518,6 +2767,31 @@ export class DatasetsByAccountNameGQL extends Apollo.Query< super(apollo); } } +export const EnginesDocument = gql` + query engines { + data { + knownEngines { + name + dialect + latestImage + } + } + } +`; + +@Injectable({ + providedIn: "root", +}) +export class EnginesGQL extends Apollo.Query< + EnginesQuery, + EnginesQueryVariables +> { + document = EnginesDocument; + + constructor(apollo: Apollo.Apollo) { + super(apollo); + } +} export const GithubLoginDocument = gql` mutation GithubLogin($code: String!) { auth { diff --git a/src/app/app-routing.module.ts b/src/app/app-routing.module.ts index b9cd97957..372daa6ab 100644 --- a/src/app/app-routing.module.ts +++ b/src/app/app-routing.module.ts @@ -13,6 +13,7 @@ import { AccountComponent } from "./auth/account/account.component"; import { GithubCallbackComponent } from "./auth/github-callback/github.callback"; import { environment } from "../environments/environment"; import ProjectLinks from "./project-links"; +import { SetTransformComponent } from "./dataset-view/additional-components/metadata-component/components/set-transform/set-transform.component"; const githubUrl = `https://github.com/login/oauth/authorize?scope=user:email&client_id=${environment.github_client_id}`; @@ -78,6 +79,12 @@ export const routes: Routes = [ `/${ProjectLinks.URL_PARAM_ADD_POLLING_SOURCE}`, component: AddPollingSourceComponent, }, + { + path: + `:${ProjectLinks.URL_PARAM_ACCOUNT_NAME}/:${ProjectLinks.URL_PARAM_DATASET_NAME}` + + `/${ProjectLinks.URL_PARAM_SET_TRANSFORM}`, + component: SetTransformComponent, + }, { path: "**", component: PageNotFoundComponent, diff --git a/src/app/app.module.ts b/src/app/app.module.ts index 9a21d8354..e6ad8736c 100644 --- a/src/app/app.module.ts +++ b/src/app/app.module.ts @@ -66,7 +66,6 @@ import { DatasetsTabComponent } from "./auth/account/additional-components/datas import { ClipboardModule } from "@angular/cdk/clipboard"; import { HighlightModule, HIGHLIGHT_OPTIONS } from "ngx-highlightjs"; import { ToastrModule } from "ngx-toastr"; -import { AddPollingSourceComponent } from "./dataset-view/additional-components/metadata-component/components/add-polling-source/add-polling-source.component"; const Services = [ { @@ -137,7 +136,6 @@ const MatModules = [ NotificationIndicatorComponent, SettingsComponent, DatasetsTabComponent, - AddPollingSourceComponent, ], imports: [ AppRoutingModule, diff --git a/src/app/common/app.helpers.ts b/src/app/common/app.helpers.ts index bdaf4aaa7..bc9f48640 100644 --- a/src/app/common/app.helpers.ts +++ b/src/app/common/app.helpers.ts @@ -1,5 +1,7 @@ import moment from "moment"; import { MaybeNull } from "./app.types"; +import { GetDatasetSchemaQuery } from "../api/kamu.graphql.interface"; +import { DatasetSchema } from "../interface/dataset.interface"; export function requireValue(input: MaybeNull) { if (input === null) throw Error("value is required!"); @@ -57,3 +59,13 @@ export function momentConvertDatetoLocalWithFormat(dateParams: { return moment(ISOStringDate).format(dateParams.format); } + +export function parseCurrentSchema( + data: GetDatasetSchemaQuery, +): MaybeNull { + return data.datasets.byId?.metadata.currentSchema + ? (JSON.parse( + data.datasets.byId.metadata.currentSchema.content, + ) as DatasetSchema) + : null; +} diff --git a/src/app/common/base-yaml-event.service.ts b/src/app/common/base-yaml-event.service.ts new file mode 100644 index 000000000..eae72c7da --- /dev/null +++ b/src/app/common/base-yaml-event.service.ts @@ -0,0 +1,89 @@ +import { inject } from "@angular/core"; +import { DatasetService } from "../dataset-view/dataset.service"; +import { Observable, EMPTY, iif, of, zip, Subject } from "rxjs"; +import { expand, last, map, switchMap } from "rxjs/operators"; +import { + DatasetKind, + MetadataBlockFragment, +} from "../api/kamu.graphql.interface"; +import { BlockService } from "../dataset-block/metadata-block/block.service"; +import { SupportedEvents } from "../dataset-block/metadata-block/components/event-details/supported.events"; +import { DatasetHistoryUpdate } from "../dataset-view/dataset.subscriptions.interface"; +import { DatasetInfo } from "../interface/navigation.interface"; + +export abstract class BaseYamlEventService { + private appDatasetService = inject(DatasetService); + private blockService = inject(BlockService); + private currentPage = 0; + private readonly historyPageSize = 100; + public history: DatasetHistoryUpdate; + private kindChanges$: Subject = new Subject(); + public changeKindChanges(data: DatasetKind): void { + this.kindChanges$.next(data); + } + public get onKindChanges(): Observable { + return this.kindChanges$.asObservable(); + } + + public getEventAsYaml( + info: DatasetInfo, + typename: SupportedEvents, + ): Observable { + return this.appDatasetService + .getDatasetHistory(info, this.historyPageSize, this.currentPage) + .pipe( + expand((h: DatasetHistoryUpdate) => { + const filteredHistory = this.filterHistoryByType( + h.history, + typename, + ); + return filteredHistory.length === 0 && + h.pageInfo.hasNextPage + ? this.appDatasetService.getDatasetHistory( + info, + this.historyPageSize, + h.pageInfo.currentPage + 1, + ) + : EMPTY; + }), + map((h: DatasetHistoryUpdate) => { + if (h.kind) { + this.changeKindChanges(h.kind); + } + this.history = h; + const filteredHistory = this.filterHistoryByType( + h.history, + typename, + ); + return filteredHistory; + }), + switchMap((filteredHistory: MetadataBlockFragment[]) => + iif( + () => !filteredHistory.length, + of(null), + zip( + this.blockService.onMetadataBlockAsYamlChanges, + this.blockService.requestMetadataBlock( + info, + filteredHistory[0]?.blockHash as string, + ), + ), + ), + ), + map((result: [string, unknown] | null) => { + if (result) return result[0]; + else return null; + }), + last(), + ); + } + + private filterHistoryByType( + history: MetadataBlockFragment[], + typename: string, + ): MetadataBlockFragment[] { + return history.filter( + (item: MetadataBlockFragment) => item.event.__typename === typename, + ); + } +} diff --git a/src/app/common/data.helpers.ts b/src/app/common/data.helpers.ts index 4c2bbbcf8..adc94b65b 100644 --- a/src/app/common/data.helpers.ts +++ b/src/app/common/data.helpers.ts @@ -39,6 +39,12 @@ export class DataHelpers { name: "Apache Spark", url_logo: "assets/images/apache-spark.png", }; + + case "datafusion": + return { + name: "DataFusion", + url_logo: "assets/images/datafusion-logo.png", + }; default: console.log("Engine is not defined"); return { @@ -131,9 +137,11 @@ export class DataHelpers { switch (event.__typename) { case "AddData": return `Added ${ - event.outputData.interval.end - - event.outputData.interval.start + - 1 + event.outputData + ? event.outputData.interval.end - + event.outputData.interval.start + + 1 + : 0 } new records`; case "ExecuteQuery": return `Transformation produced ${ diff --git a/src/app/dataset-block/metadata-block/components/event-details/components/common/sql-query-viewer/sql-query-viewer.component.ts b/src/app/dataset-block/metadata-block/components/event-details/components/common/sql-query-viewer/sql-query-viewer.component.ts index ef7249d66..466f7cb24 100644 --- a/src/app/dataset-block/metadata-block/components/event-details/components/common/sql-query-viewer/sql-query-viewer.component.ts +++ b/src/app/dataset-block/metadata-block/components/event-details/components/common/sql-query-viewer/sql-query-viewer.component.ts @@ -1,6 +1,5 @@ import { SqlQueryStep } from "../../../../../../../api/kamu.graphql.interface"; import { ChangeDetectionStrategy, Component, Input } from "@angular/core"; -import { sqlEditorOptionsForEvents } from "../../../config-editor.events"; import { BasePropertyComponent } from "../base-property/base-property.component"; @Component({ @@ -11,5 +10,4 @@ import { BasePropertyComponent } from "../base-property/base-property.component" }) export class SqlQueryViewerComponent extends BasePropertyComponent { @Input() public data: SqlQueryStep[]; - public sqlEditorOptions = sqlEditorOptionsForEvents; } diff --git a/src/app/dataset-block/metadata-block/components/event-details/config-editor.events.ts b/src/app/dataset-block/metadata-block/components/event-details/config-editor.events.ts index d39251af1..f487844cb 100644 --- a/src/app/dataset-block/metadata-block/components/event-details/config-editor.events.ts +++ b/src/app/dataset-block/metadata-block/components/event-details/config-editor.events.ts @@ -1,14 +1,11 @@ import * as monaco from "monaco-editor"; -export const sqlEditorOptionsForEvents: monaco.editor.IStandaloneEditorConstructionOptions = +export const sqlEditorOptions: monaco.editor.IStandaloneEditorConstructionOptions = { theme: "vs", language: "sql", - contextmenu: false, - wordWrap: "on", - readOnly: true, renderLineHighlight: "none", - lineNumbers: "off", minimap: { enabled: false, }, + scrollBeyondLastLine: false, }; diff --git a/src/app/dataset-create/dataset-create.component.html b/src/app/dataset-create/dataset-create.component.html index a34c2f03f..256658bd7 100644 --- a/src/app/dataset-create/dataset-create.component.html +++ b/src/app/dataset-create/dataset-create.component.html @@ -125,7 +125,7 @@

Create a new dataset

> Query: - +
+ +

Error:

{{ sqlErrorMarker }}

diff --git a/src/app/dataset-view/additional-components/data-component/data-component.ts b/src/app/dataset-view/additional-components/data-component/data-component.ts index 6d69f771f..c087b7a93 100644 --- a/src/app/dataset-view/additional-components/data-component/data-component.ts +++ b/src/app/dataset-view/additional-components/data-component/data-component.ts @@ -21,6 +21,7 @@ import { BaseComponent } from "src/app/common/base.component"; import { DatasetBasicsFragment } from "src/app/api/kamu.graphql.interface"; import * as monaco from "monaco-editor"; import { MaybeNull } from "src/app/common/app.types"; +import { sqlEditorOptions } from "src/app/dataset-block/metadata-block/components/event-details/config-editor.events"; @Component({ selector: "app-data", @@ -30,14 +31,7 @@ import { MaybeNull } from "src/app/common/app.types"; export class DataComponent extends BaseComponent implements OnInit { @Input() public datasetBasics?: DatasetBasicsFragment; @Output() public runSQLRequestEmit = new EventEmitter(); - public sqlEditorOptions = { - theme: "vs", - language: "sql", - renderLineHighlight: "none", - minimap: { - enabled: false, - }, - }; + public sqlEditorOptions = sqlEditorOptions; public savedQueries = DataTabValues.savedQueries; public sqlRequestCode = `select\n *\nfrom `; diff --git a/src/app/dataset-view/additional-components/metadata-component/components/add-polling-source/add-polling-source.component.html b/src/app/dataset-view/additional-components/metadata-component/components/add-polling-source/add-polling-source.component.html index c93dc6750..0a3eb8bda 100644 --- a/src/app/dataset-view/additional-components/metadata-component/components/add-polling-source/add-polling-source.component.html +++ b/src/app/dataset-view/additional-components/metadata-component/components/add-polling-source/add-polling-source.component.html @@ -1,4 +1,4 @@ -
+
+ +
There is no metadata. >

There is no metadata. Need add - SetTransform event + SetTransform event

diff --git a/src/app/dataset-view/additional-components/overview-component/overview-component.html b/src/app/dataset-view/additional-components/overview-component/overview-component.html index 4f27ee605..63a7a3934 100644 --- a/src/app/dataset-view/additional-components/overview-component/overview-component.html +++ b/src/app/dataset-view/additional-components/overview-component/overview-component.html @@ -89,6 +89,20 @@

>

+
+

+ You can + + Add transformation +

+
= data.datasets - .byOwnerAndName.metadata.currentSchema - ? (JSON.parse( - data.datasets.byOwnerAndName.metadata - .currentSchema.content, - ) as DatasetSchema) - : null; - + const schema: MaybeNull = + parseCurrentSchema(data); this.datasetUpdate(data.datasets.byOwnerAndName); this.overviewTabDataUpdate( data.datasets.byOwnerAndName, @@ -157,6 +153,7 @@ export class DatasetService { history: data.datasets.byOwnerAndName.metadata.chain .blocks.nodes as MetadataBlockFragment[], pageInfo, + kind: data.datasets.byOwnerAndName.kind, }; return historyUpdate; } else { @@ -207,6 +204,12 @@ export class DatasetService { return this.datasetApi.getDatasetInfoById(datasetId); } + public requestDatasetSchema( + datasetId: string, + ): Observable { + return this.datasetApi.getDatasetSchema(datasetId); + } + private datasetUpdate(data: DatasetBasicsFragment): void { const dataset: DatasetBasicsFragment = data; this.datasetChanges(dataset); diff --git a/src/app/dataset-view/dataset.subscriptions.interface.ts b/src/app/dataset-view/dataset.subscriptions.interface.ts index 2547d009a..9df8821c3 100644 --- a/src/app/dataset-view/dataset.subscriptions.interface.ts +++ b/src/app/dataset-view/dataset.subscriptions.interface.ts @@ -1,4 +1,5 @@ import { + DatasetKind, DatasetPageInfoFragment, SetVocab, } from "./../api/kamu.graphql.interface"; @@ -38,6 +39,7 @@ export interface MetadataSchemaUpdate { export interface DatasetHistoryUpdate { history: MetadataBlockFragment[]; pageInfo: DatasetPageInfoFragment; + kind?: DatasetKind; } export interface LineageUpdate { diff --git a/src/app/project-links.ts b/src/app/project-links.ts index c741c3a83..1d169abd4 100644 --- a/src/app/project-links.ts +++ b/src/app/project-links.ts @@ -29,6 +29,7 @@ export default class ProjectLinks { public static readonly URL_PARAM_BLOCK_HASH: string = "blockHash"; public static readonly URL_PARAM_ADD_POLLING_SOURCE: string = "add-polling-source"; + public static readonly URL_PARAM_SET_TRANSFORM: string = "set-transform"; public static readonly URL_QUERY_PARAM_TAB: string = "tab"; public static readonly URL_QUERY_PARAM_PAGE: string = "page"; diff --git a/src/app/services/engine.service.spec.ts b/src/app/services/engine.service.spec.ts new file mode 100644 index 000000000..29f466d95 --- /dev/null +++ b/src/app/services/engine.service.spec.ts @@ -0,0 +1,20 @@ +import { TestBed } from "@angular/core/testing"; +import { EngineService } from "./engine.service"; +import { Apollo, ApolloModule } from "apollo-angular"; +import { ApolloTestingModule } from "apollo-angular/testing"; + +describe("EngineService", () => { + let service: EngineService; + + beforeEach(() => { + TestBed.configureTestingModule({ + providers: [Apollo], + imports: [ApolloModule, ApolloTestingModule], + }); + service = TestBed.inject(EngineService); + }); + + it("should be created", () => { + expect(service).toBeTruthy(); + }); +}); diff --git a/src/app/services/engine.service.ts b/src/app/services/engine.service.ts new file mode 100644 index 000000000..c2e61ab87 --- /dev/null +++ b/src/app/services/engine.service.ts @@ -0,0 +1,15 @@ +import { Injectable } from "@angular/core"; +import { EngineApi } from "../api/engine.api"; +import { Observable } from "rxjs"; +import { EnginesQuery } from "../api/kamu.graphql.interface"; + +@Injectable({ + providedIn: "root", +}) +export class EngineService { + constructor(private engineApi: EngineApi) {} + + public engines(): Observable { + return this.engineApi.getEngines(); + } +} diff --git a/src/app/services/navigation.service.ts b/src/app/services/navigation.service.ts index a329cdc05..aa24214fc 100644 --- a/src/app/services/navigation.service.ts +++ b/src/app/services/navigation.service.ts @@ -69,6 +69,16 @@ export class NavigationService { ); } + public navigateToSetTransform(params: DatasetInfo): void { + promiseWithCatch( + this.router.navigate([ + params.accountName, + params.datasetName, + ProjectLinks.URL_PARAM_SET_TRANSFORM, + ]), + ); + } + public navigateToDatasetView(params: DatasetNavigationParams): void { promiseWithCatch( this.router.navigate([params.accountName, params.datasetName], { diff --git a/src/app/services/templates-yaml-events.service.ts b/src/app/services/templates-yaml-events.service.ts index 275ae974a..653967a27 100644 --- a/src/app/services/templates-yaml-events.service.ts +++ b/src/app/services/templates-yaml-events.service.ts @@ -1,4 +1,8 @@ -import { SetLicense, SetPollingSource } from "./../api/kamu.graphql.interface"; +import { + SetLicense, + SetPollingSource, + SetTransform, +} from "./../api/kamu.graphql.interface"; import { Injectable } from "@angular/core"; import { MaybeNull } from "../common/app.types"; import { stringify } from "yaml"; @@ -56,6 +60,16 @@ export class TemplatesYamlEventsService { return stringify(this.initialTemplate); } + public buildYamlSetTransformEvent( + params: Omit, + ): string { + this.initialTemplate.content = { + kind: "setTransform", + ...params, + }; + return stringify(this.initialTemplate); + } + public buildYamlSetWatermarkEvent(dateTime: string): string { let result = this.initialSetWatermarkTemplate; result += ` outputWatermark: ${dateTime}`; diff --git a/src/assets/images/datafusion-logo.png b/src/assets/images/datafusion-logo.png new file mode 100644 index 000000000..1cc7a9ce6 Binary files /dev/null and b/src/assets/images/datafusion-logo.png differ diff --git a/src/styles.sass b/src/styles.sass index 77f258c40..ab05ddb16 100644 --- a/src/styles.sass +++ b/src/styles.sass @@ -1034,3 +1034,32 @@ pre .cdk-global-scrollblock overflow: hidden !important + +.mat-tree + .mat-tree-node + .mat-icon-button + background-color: transparent !important + +.w-425 + width: 370px!important + +.button-custom + display: flex + padding: 5px 10px + align-items: center + justify-content: center + border: 1px solid black + border-radius: 5px + &:hover + cursor: pointer + filter: alpha(opacity=90) + -moz-opacity: 0.9 + opacity: 0.9 + +.no-button + border: none + background: transparent + &:hover + cursor: pointer + &:enabled + color: blue