Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: integration tests #47

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# uncomment to try out deploying the api under a custom domain.
# the value should match a hosted zone configured in route53 that your aws account has access to.
# HOSTED_ZONE=filecoin.dag.haus
# HOSTED_ZONE=filecoin.web3.storage

# uncomment to set SENTRY_DSN
# SENTRY_DSN = ''
Expand Down
36 changes: 21 additions & 15 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
"lint": "tsc && eslint '**/*.js'",
"clean": "rm -rf dist node_modules ./*/{.cache,dist,node_modules}",
"test": "npm test -w packages/core -w packages/filecoin-client",
"test-integration": "ava --verbose --serial --timeout=300s test/*.test.js"
"test-integration": "ava --verbose --serial --timeout=600s test/*.test.js"
},
"devDependencies": {
"@ipld/dag-ucan": "3.4.0",
"@sentry/serverless": "^7.52.1",
"@tsconfig/node16": "^1.0.3",
"@types/git-rev-sync": "^2.0.0",
"@types/node": "^18.11.18",
"@ucanto/client": "8.0.0",
"@ucanto/principal": "8.0.0",
"@ucanto/transport": "8.0.0",
"@web3-storage/filecoin-api": "^1.4.3",
"@web3-storage/filecoin-client": "1.3.0",
"ava": "^5.3.0",
"aws-cdk-lib": "^2.84.0",
"constructs": "10.1.156",
Expand Down Expand Up @@ -52,6 +58,7 @@
"unicorn/filename-case": "off",
"unicorn/prefer-set-has": "off",
"unicorn/no-array-callback-reference": "off",
"unicorn/no-array-reduce": "off",
"unicorn/no-await-expression-member": "off",
"unicorn/no-zero-fractions": "off",
"no-console": "off",
Expand Down
1 change: 0 additions & 1 deletion packages/core/src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import * as DID from '@ipld/dag-ucan/did'
import { CAR, HTTP } from '@ucanto/transport'
import { connect } from '@ucanto/client'


import { createService } from '@web3-storage/filecoin-api/aggregator'

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ export const aggregateStoreTableProps = {
partitionKey: 'stat',
sortKey: 'insertedAt',
projection: 'all'
},
indexStorefront: {
partitionKey: 'storefront',
sortKey: 'insertedAt',
projection: 'all'
Comment on lines +59 to +62
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added new index so that we can query all aggregates by a given storefront. We currently use this for the integration tests, given we create a new storefront did per integration test run, it will always be a new one.

This will likely not be needed in the day to day of operating this system. However, it might be a query important to also do ad hoc.

}
}
}
Expand Down
1 change: 0 additions & 1 deletion packages/core/src/store/table-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ export function createTableStoreClient (conf, context) {
}),
})

// retry to avoid throttling errors
try {
await tableclient.send(putCmd)
} catch (/** @type {any} */ error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { decode as aggregateDecode } from '../data/aggregate.js'
* @param {import('@web3-storage/filecoin-client/types').InvocationConfig} props.invocationConfig
* @param {import('@ucanto/principal/ed25519').ConnectionView<any>} props.dealerServiceConnection
*/
export async function dealerAdd ({
export async function dealerQueue ({
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to what this actually does, which is queuing the deal for the dealer to handle

bufferStoreClient,
aggregateStoreClient,
aggregateRecord,
Expand All @@ -36,7 +36,7 @@ export async function dealerAdd ({
})

// Add aggregate to dealer
const add = await Dealer.dealAdd(
const dealQueue = await Dealer.dealQueue(
invocationConfig,
aggregate.link,
bufferReference.ok.pieces.map(p => p.piece),
Expand All @@ -45,29 +45,29 @@ export async function dealerAdd ({
{ connection: dealerServiceConnection }
)

if (add.out.error) {
if (dealQueue.out.error) {
return {
error: add.out.error
error: dealQueue.out.error
}
}

// Save aggregate
const aggregateStored = await aggregateStoreClient.put({
piece: aggregate.link,
buffer: bufferRef.buffer,
task: add.ran.link(),
invocation: add.ran.link(),
task: dealQueue.ran.link(),
invocation: dealQueue.ran.link(),
insertedAt: Date.now(),
storefront: bufferReference.ok.storefront,
group: bufferReference.ok.group,
stat: 0
})

if (aggregateStored.error) {
return {
error: aggregateStored.error
}
}

return {
ok: 1
}
Expand Down
16 changes: 8 additions & 8 deletions packages/core/test/helpers/mocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ const notImplemented = () => {
*/
export function mockService(impl) {
return {
deal: {
add: withCallCount(impl.deal?.add ?? notImplemented),
queue: withCallCount(impl.deal?.queue ?? notImplemented),
},
aggregate: {
add: withCallCount(impl.aggregate?.add ?? notImplemented),
queue: withCallCount(impl.aggregate?.queue ?? notImplemented),
},
deal: {
add: withCallCount(impl.deal?.add ?? notImplemented),
queue: withCallCount(impl.deal?.queue ?? notImplemented),
},
aggregate: {
add: withCallCount(impl.aggregate?.add ?? notImplemented),
queue: withCallCount(impl.aggregate?.queue ?? notImplemented),
},
}
}

Expand Down
6 changes: 3 additions & 3 deletions packages/core/test/workflow/buffer-reducing.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ test('can reduce received buffers by creating an aggregate and remaining buffer'

const bucketName = await createBucket(s3)
const { buffers, bufferRecords } = await getBuffers(2, {
length: 100,
size: 128
length: 10,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just changed to use same values as integration tests and be able to catch potential issues faster there

size: 1024
})

const storeClient = createBucketStoreClient(s3, {
Expand Down Expand Up @@ -152,7 +152,7 @@ test('can reduce received buffers by creating an aggregate and remaining buffer'
aggregateQueueClient,
bufferRecords,
minAggregateSize: 2 ** 13,
maxAggregateSize: 2 ** 22
maxAggregateSize: 2 ** 15
})

t.falsy(reduceBufferResp.error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { createTableStoreClient } from '../../src/store/table-client.js'
import { createBucketStoreClient } from '../../src/store/bucket-client.js'
import { aggregateStoreTableProps } from '../../src/store/index.js'

import { dealerAdd } from '../../src/workflow/dealer-add.js'
import { dealerQueue } from '../../src/workflow/dealer-queue.js'

/**
* @typedef {import('../../src/data/types.js').PiecePolicy} PiecePolicy
Expand All @@ -44,26 +44,26 @@ test('can add produced aggregate', async t => {
// Store buffer used for aggregate
await bufferStoreClient.put(buffer)

const dealAddCall = pDefer()
const dealerQueueCall = pDefer()
const { invocationConfig, dealerService } = await getService({
onCall: dealAddCall
onCall: dealerQueueCall
})
const dealerAddResp = await dealerAdd({
const dealerQueueResp = await dealerQueue({
bufferStoreClient,
aggregateStoreClient,
aggregateRecord: await aggregateEncode.message(aggregateRecord),
invocationConfig,
dealerServiceConnection: dealerService.connection
})

t.truthy(dealerAddResp.ok)
t.falsy(dealerAddResp.error)
t.is(dealerAddResp.ok, 1)
t.truthy(dealerQueueResp.ok)
t.falsy(dealerQueueResp.error)
t.is(dealerQueueResp.ok, 1)

// Validate ucanto server call
t.is(dealerService.service.deal.add.callCount, 1)
const invCap = await dealAddCall.promise
t.is(invCap.can, 'deal/add')
t.is(dealerService.service.deal.queue.callCount, 1)
const invCap = await dealerQueueCall.promise
t.is(invCap.can, 'deal/queue')

// TODO: validate CID of piece invCap.nb.piece
// TODO: validate deal content invCap.nb.deal
Expand All @@ -77,24 +77,24 @@ test('fails adding aggregate if fails to read from store', async t => {
aggregateStoreClient
} = await getContext(t.context)

const dealAddCall = pDefer()
const dealerQueueCall = pDefer()
const { invocationConfig, dealerService } = await getService({
onCall: dealAddCall
onCall: dealerQueueCall
})
const dealerAddResp = await dealerAdd({
const dealerQueueResp = await dealerQueue({
bufferStoreClient,
aggregateStoreClient,
aggregateRecord: await aggregateEncode.message(aggregateRecord),
invocationConfig,
dealerServiceConnection: dealerService.connection
})

t.falsy(dealerAddResp.ok)
t.truthy(dealerAddResp.error)
t.is(dealerAddResp.error?.name, StoreOperationErrorName)
t.falsy(dealerQueueResp.ok)
t.truthy(dealerQueueResp.error)
t.is(dealerQueueResp.error?.name, StoreOperationErrorName)

// Validate ucanto server call
t.is(dealerService.service.deal.add.callCount, 0)
t.is(dealerService.service.deal.queue.callCount, 0)
})

test('fails adding aggregate if fails to add to dealer', async t => {
Expand All @@ -108,25 +108,25 @@ test('fails adding aggregate if fails to add to dealer', async t => {
// Store buffer used for aggregate
await bufferStoreClient.put(buffer)

const dealAddCall = pDefer()
const dealerQueueCall = pDefer()
const { invocationConfig, dealerService } = await getService({
onCall: dealAddCall,
onCall: dealerQueueCall,
mustFail: true
})
const dealerAddResp = await dealerAdd({
const dealerQueueResp = await dealerQueue({
bufferStoreClient,
aggregateStoreClient,
aggregateRecord: await aggregateEncode.message(aggregateRecord),
invocationConfig,
dealerServiceConnection: dealerService.connection
})

t.falsy(dealerAddResp.ok)
t.truthy(dealerAddResp.error)
t.is(dealerAddResp.error?.name, OperationErrorName)
t.falsy(dealerQueueResp.ok)
t.truthy(dealerQueueResp.error)
t.is(dealerQueueResp.error?.name, OperationErrorName)

// Validate ucanto server call
t.is(dealerService.service.deal.add.callCount, 1)
t.is(dealerService.service.deal.queue.callCount, 1)
})

/**
Expand Down
Loading
Loading