Skip to content
This repository has been archived by the owner on Jan 1, 2025. It is now read-only.

Commit

Permalink
fix: migration fixes for testing scenarios.
Browse files Browse the repository at this point in the history
  • Loading branch information
elribonazo committed Feb 23, 2024
1 parent 21fdc5a commit a4d8dff
Showing 1 changed file with 28 additions and 19 deletions.
47 changes: 28 additions & 19 deletions packages/inmemory/src/inMemoryStorage/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import { fixTxPipe } from '@pluto-encrypted/shared'
import { type QueryMatcher } from 'rxdb/dist/types/types'

export class RxStorageIntanceInMemory<RxDocType> implements RxStorageInstance<
RxDocType,
InMemoryStorageInternals<RxDocType>,
InMemorySettings,
RxStorageDefaultCheckpoint> {
RxDocType,
InMemoryStorageInternals<RxDocType>,
InMemorySettings,
RxStorageDefaultCheckpoint> {
public readonly primaryPath: StringKeys<RxDocumentData<RxDocType>>
public conflictResultionTasks$ = new Subject<RxConflictResultionTask<RxDocType>>()
public changes$ = new Subject<EventBulk<RxStorageChangeEvent<RxDocumentData<RxDocType>>, RxStorageDefaultCheckpoint>>()
public closed: boolean = false

constructor (
constructor(
public readonly storage: RxStorageInMemoryType<RxDocType>,
public readonly databaseName: string,
public readonly collectionName: string,
Expand All @@ -28,7 +28,7 @@ RxStorageDefaultCheckpoint> {
this.primaryPath = getPrimaryFieldOfPrimaryKey(this.schema.primaryKey)
}

async bulkWrite (
async bulkWrite(
documentWrites: Array<BulkWriteRow<RxDocType>>,
context: string): Promise<RxStorageBulkWriteResponse<RxDocType>> {
const primaryPath = this.primaryPath
Expand All @@ -42,8 +42,17 @@ RxStorageDefaultCheckpoint> {
const currentId = currentWriteDoc.document[this.primaryPath] as any
const previousDocument = currentWriteDoc.previous ?? this.internals.documents.get(currentId)
if (context === 'data-migrator-delete') {
if (previousDocument && previousDocument._rev === currentWriteDoc.document._rev) {
fixedDocs.push(currentWriteDoc)
if (previousDocument) {
fixedDocs.push({
document: {
...previousDocument,
_deleted: true
},
previous: {
...previousDocument,
_deleted: false
}
});
}
} else {
if (previousDocument && previousDocument._rev !== currentWriteDoc.document._rev) {
Expand Down Expand Up @@ -100,11 +109,11 @@ RxStorageDefaultCheckpoint> {
return await Promise.resolve(ret)
}

async findDocumentsById (ids: string[], withDeleted: boolean): Promise<RxDocumentDataById<RxDocType>> {
async findDocumentsById(ids: string[], withDeleted: boolean): Promise<RxDocumentDataById<RxDocType>> {
return this.internals.bulkGet(ids, withDeleted)
}

async query (preparedQuery: InMemoryPreparedQuery<RxDocType>): Promise<RxStorageQueryResult<RxDocType>> {
async query(preparedQuery: InMemoryPreparedQuery<RxDocType>): Promise<RxStorageQueryResult<RxDocType>> {
const { queryPlan, query } = preparedQuery
const selector = query.selector
const selectorKeys = Object.keys(selector)
Expand Down Expand Up @@ -175,7 +184,7 @@ RxStorageDefaultCheckpoint> {
}
}

async count (preparedQuery: any): Promise<RxStorageCountResult> {
async count(preparedQuery: any): Promise<RxStorageCountResult> {
const result = await this.query(preparedQuery)
return {
count: result.documents.length,
Expand All @@ -184,28 +193,28 @@ RxStorageDefaultCheckpoint> {
}

/* istanbul ignore next */
async getAttachmentData (): Promise<string> {
async getAttachmentData(): Promise<string> {
throw new Error('Method not implemented.')
}

/* istanbul ignore next */
async getChangedDocumentsSince (): Promise<{ documents: Array<RxDocumentData<RxDocType>>, checkpoint: RxStorageDefaultCheckpoint }> {
async getChangedDocumentsSince(): Promise<{ documents: Array<RxDocumentData<RxDocType>>, checkpoint: RxStorageDefaultCheckpoint }> {
throw new Error('Method not implemented.')
}

/* istanbul ignore next */
changeStream (): Observable<EventBulk<RxStorageChangeEvent<RxDocType>, RxStorageDefaultCheckpoint>> {
changeStream(): Observable<EventBulk<RxStorageChangeEvent<RxDocType>, RxStorageDefaultCheckpoint>> {
return this.changes$.asObservable()
}

async cleanup (): Promise<boolean> {
async cleanup(): Promise<boolean> {
this.internals.clear()

return true
}

/* istanbul ignore next */
async close (): Promise<void> {
async close(): Promise<void> {
if (this.closed) {
await Promise.reject(new Error('already closed')); return
}
Expand All @@ -215,16 +224,16 @@ RxStorageDefaultCheckpoint> {
}

/* istanbul ignore next */
async remove (): Promise<void> {
async remove(): Promise<void> {
await Promise.resolve()
}

conflictResultionTasks (): Observable<RxConflictResultionTask<RxDocType>> {
conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
return this.conflictResultionTasks$.asObservable()
}

/* istanbul ignore next */
async resolveConflictResultionTask (): Promise<void> {
async resolveConflictResultionTask(): Promise<void> {
await Promise.resolve()
}
}

0 comments on commit a4d8dff

Please sign in to comment.