Skip to content
This repository has been archived by the owner on Mar 23, 2023. It is now read-only.

Commit

Permalink
feat: add streaming/cancellable API (#39)
Browse files Browse the repository at this point in the history
Upgrades to the latest interface-datastore which includes streaming APIs and passing AbortControllers.

Uses the new Adapter class to implement these with minimal code changes.

* Splits out open so it's not called from the constructor any more
* Combines some of the private methods to make code flow simpler
* Removes code duplicated by the interface adapter
  • Loading branch information
achingbrain authored May 7, 2020
1 parent c6f2762 commit 5232c1c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 122 deletions.
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
},
"homepage": "https://github.com/ipfs/js-datastore-fs#readme",
"dependencies": {
"datastore-core": "^1.0.0",
"fast-write-atomic": "~0.2.0",
"datastore-core": "^1.1.0",
"fast-write-atomic": "^0.2.0",
"glob": "^7.1.3",
"interface-datastore": "~0.8.3",
"interface-datastore": "^1.0.2",
"mkdirp": "^1.0.4"
},
"devDependencies": {
"aegir": "^21.10.2",
"aegir": "^22.0.0",
"async-iterator-all": "^1.0.0",
"chai": "^4.2.0",
"cids": "~0.8.0",
Expand Down
138 changes: 22 additions & 116 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,17 @@ const mkdirp = require('mkdirp')
const promisify = require('util').promisify
const writeAtomic = promisify(require('fast-write-atomic'))
const path = require('path')

const filter = require('interface-datastore').utils.filter
const take = require('interface-datastore').utils.take
const map = require('interface-datastore').utils.map
const sortAll = require('interface-datastore').utils.sortAll
const IDatastore = require('interface-datastore')
const {
Adapter, Key, Errors, utils: {
map
}
} = require('interface-datastore')

const noop = () => {}
const fsAccess = promisify(fs.access || noop)
const fsReadFile = promisify(fs.readFile || noop)
const fsUnlink = promisify(fs.unlink || noop)

const Key = IDatastore.Key
const Errors = IDatastore.Errors

async function writeFile (path, contents) {
try {
await writeAtomic(path, contents)
Expand All @@ -47,63 +43,30 @@ async function writeFile (path, contents) {
* Keys need to be sanitized before use, as they are written
* to the file system as is.
*/
class FsDatastore {
class FsDatastore extends Adapter {
constructor (location, opts) {
super()

this.path = path.resolve(location)
this.opts = Object.assign({}, {
createIfMissing: true,
errorIfExists: false,
extension: '.data'
}, opts)

if (this.opts.createIfMissing) {
this._openOrCreate()
} else {
this._open()
}
}

open () {
this._openOrCreate()
}

/**
* Check if the path actually exists.
* @private
* @returns {void}
*/
_open () {
if (!fs.existsSync(this.path)) {
throw Errors.notFoundError(new Error(`Datastore directory: ${this.path} does not exist`))
}

if (this.opts.errorIfExists) {
throw Errors.dbOpenFailedError(new Error(`Datastore directory: ${this.path} already exists`))
}
}

/**
* Create the directory to hold our data.
*
* @private
* @returns {void}
*/
_create () {
mkdirp.sync(this.path, { fs: fs })
}

/**
* Tries to open, and creates if the open fails.
*
* @private
* @returns {void}
*/
_openOrCreate () {
try {
this._open()
if (!fs.existsSync(this.path)) {
throw Errors.notFoundError(new Error(`Datastore directory: ${this.path} does not exist`))
}

if (this.opts.errorIfExists) {
throw Errors.dbOpenFailedError(new Error(`Datastore directory: ${this.path} already exists`))
}
} catch (err) {
if (err.code === 'ERR_NOT_FOUND') {
this._create()
if (err.code === 'ERR_NOT_FOUND' && this.opts.createIfMissing) {
mkdirp.sync(this.path, { fs: fs })
return
}

Expand Down Expand Up @@ -165,7 +128,7 @@ class FsDatastore {
}

/**
* Store the given value under the key.
* Store the given value under the key
*
* @param {Key} key
* @param {Buffer} val
Expand Down Expand Up @@ -252,84 +215,27 @@ class FsDatastore {
}
}

/**
* Create a new batch object.
*
* @returns {Batch}
*/
batch () {
const puts = []
const deletes = []
return {
put (key, value) {
puts.push({ key: key, value: value })
},
delete (key) {
deletes.push(key)
},
commit: () /* : Promise<void> */ => {
return Promise.all(
puts
.map((put) => this.put(put.key, put.value))
.concat(
deletes.map((del) => this.delete(del))
)
)
}
}
}

/**
* Query the store.
*
* @param {Object} q
* @returns {Iterable}
*/
query (q) {
async * _all (q) { // eslint-disable-line require-await
// glob expects a POSIX path
const prefix = q.prefix || '**'
const pattern = path
.join(this.path, prefix, '*' + this.opts.extension)
.split(path.sep)
.join('/')
const files = glob.sync(pattern)
let it

if (!q.keysOnly) {
it = map(files, async (f) => {
yield * map(files, async (f) => {
const buf = await fsReadFile(f)
return {
key: this._decode(f),
value: buf
}
})
} else {
it = map(files, f => ({ key: this._decode(f) }))
}

if (Array.isArray(q.filters)) {
it = q.filters.reduce((it, f) => filter(it, f), it)
}

if (Array.isArray(q.orders)) {
it = q.orders.reduce((it, f) => sortAll(it, f), it)
}

if (q.offset != null) {
let i = 0
it = filter(it, () => i++ >= q.offset)
yield * map(files, f => ({ key: this._decode(f) }))
}

if (q.limit != null) {
it = take(it, q.limit)
}

return it
}

/**
* Close the store.
*/
close () { }
}

module.exports = FsDatastore
8 changes: 6 additions & 2 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,23 @@ describe('FsDatastore', () => {
() => new FsStore(dir)
).to.not.throw()
})
})

describe('open', () => {
it('createIfMissing: false - folder missing', () => {
const dir = utils.tmpdir()
const store = new FsStore(dir, { createIfMissing: false })
expect(
() => new FsStore(dir, { createIfMissing: false })
() => store.open()
).to.throw()
})

it('errorIfExists: true - folder exists', () => {
const dir = utils.tmpdir()
mkdirp.sync(dir)
const store = new FsStore(dir, { errorIfExists: true })
expect(
() => new FsStore(dir, { errorIfExists: true })
() => store.open()
).to.throw()
})
})
Expand Down

0 comments on commit 5232c1c

Please sign in to comment.