Skip to content

Commit

Permalink
1.4.3
Browse files Browse the repository at this point in the history
* version bump

* add maxwps/COUCH_MAX_WPS config for rate limiting writes

* 1.4.3
  • Loading branch information
glynnbird authored Apr 7, 2020
1 parent ddd3320 commit 55b1958
Show file tree
Hide file tree
Showing 6 changed files with 849 additions and 464 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ If you are importing data into a CouchDB database that already contains data, an
* COUCH_PREVIEW - run in preview mode
* COUCH_IGNORE_FIELDS - a comma-separated list of field names to ignore on import or export e.g. price,url,image
* COUCH_OVERWRITE - overwrite existing document revisions with supplied data
* COUCH_PARALLELISM - the maximum number of HTTP requests to have in flight at any one time (default: 1)
* COUCH_MAX_WPS - the maximum number of write API calls to make per second (rate limiting) (default: 0 - no rate limiting)
## Command-line parameters
Expand All @@ -239,6 +241,7 @@ You can also configure `couchimport` and `couchexport` using command-line parame
* `--preview`/`-p` - if 'true', runs in preview mode (default false)
* `--ignorefields`/`-i` - a comma-separated list of fields to ignore input or output (default none)
* `--parallelism` - the number of HTTP request to have in flight at any one time (default 1)
* `--maxwps` - the maximum number of write API calls to make per second (default 0 - no rate limiting)
* `--overwrite`/`-o` - overwrite existing document revisions with supplied data (default: false)
e.g.
Expand Down Expand Up @@ -387,10 +390,12 @@ The emitted data is an object containing:
* failed - the number of documents failed to write in the last batch
* totalfailed - the number of documents that failed to write in total
## Parallelism
## Parallelism & Rate limiting
Using the `COUCH_PARALLELISM` environment variable or the `--parallelism` command-line option, couchimport can be configured to write data in multiple parallel operations. If you have the networkbandwidth, this can significantly speed up large data imports e.g.
```sh
cat bigdata.csv | couchimport --database mydb --parallelism 10 --delimiter ","
```
This can be combined with the `COUCH_MAX_WPS`/`--maxwps` parameter to limit the number write API calls dispatched per second to make sure you don't exceed the number writes on a rate-limited service.
2 changes: 1 addition & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const importStream = function (rs, opts, callback) {
opts = defaults.merge(opts)

// load dependencies
const writer = require('./includes/writer.js')(opts.url, opts.database, opts.buffer, opts.parallelism, opts.ignorefields, opts.overwrite)
const writer = require('./includes/writer.js')(opts.url, opts.database, opts.buffer, opts.parallelism, opts.ignorefields, opts.overwrite, opts.maxwps)
const transformer = require('./includes/transformer.js')(opts.transform, opts.meta)
const JSONStream = require('JSONStream')
if (opts.type === 'jsonl') {
Expand Down
5 changes: 5 additions & 0 deletions includes/args.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ const parse = function () {
describe: 'the number of HTTP requests to have in-flight at any one time',
default: process.env.COUCH_PARALLELISM ? parseInt(process.env.COUCH_PARALLELISM) : 1
})
.option('maxwps', {
number: true,
describe: 'the maximum number of write operations to perform per second',
default: process.env.MAX_WPS ? parseInt(process.env.MAX_WPS) : 0
})
.option('type', {
alias: 't',
describe: 'the type of file being imported',
Expand Down
7 changes: 4 additions & 3 deletions includes/writer.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const async = require('async')
const qrate = require('qrate')
const debug = require('debug')('couchimport')
const iam = require('./iam.js')
const axios = require('axios').default
Expand All @@ -7,7 +8,7 @@ const axios = require('axios').default
const IAM_API_KEY = process.env.IAM_API_KEY ? process.env.IAM_API_KEY : null
let iamAccessToken = null

module.exports = function (couchURL, couchDatabase, bufferSize, parallelism, ignoreFields, overwrite) {
module.exports = function (couchURL, couchDatabase, bufferSize, parallelism, ignoreFields, overwrite, maxwps) {
const stream = require('stream')

let buffer = []
Expand All @@ -25,7 +26,7 @@ module.exports = function (couchURL, couchDatabase, bufferSize, parallelism, ign
}

// process the writes in bulk as a queue
const q = async.queue(async (payload) => {
const q = qrate(async (payload) => {
// detected whether we need to supply new_edits = false
let allHaveRev = true
for (var i in payload.docs) {
Expand Down Expand Up @@ -116,7 +117,7 @@ module.exports = function (couchURL, couchDatabase, bufferSize, parallelism, ign
totalfailed += failed
writer.emit('written', { documents: ok, failed: failed, total: written, totalfailed: totalfailed })
debug({ documents: ok, failed: failed, total: written, totalfailed: totalfailed })
}, parallelism)
}, parallelism, maxwps || undefined)

// write the contents of the buffer to CouchDB in blocks of 500
const processBuffer = function (flush, callback) {
Expand Down
Loading

0 comments on commit 55b1958

Please sign in to comment.