-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
To enable use cases where blocks may be stored in more than one location, add a `TieredBlockstore` class to `blockstore-core` similar to the`TieredDatastore` class found in `datastore-core`.
- Loading branch information
1 parent
4279b47
commit 5143948
Showing
5 changed files
with
246 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import { logger } from '@libp2p/logger' | ||
import drain from 'it-drain' | ||
import filter from 'it-filter' | ||
import merge from 'it-merge' | ||
import { pushable } from 'it-pushable' | ||
import { BaseBlockstore } from './base.js' | ||
import * as Errors from './errors.js' | ||
import type { Blockstore, Pair } from 'interface-blockstore' | ||
import type { AbortOptions, AwaitIterable } from 'interface-store' | ||
import type { CID } from 'multiformats/cid' | ||
|
||
const log = logger('blockstore:core:tiered') | ||
|
||
/** | ||
* A blockstore that can combine multiple stores. Puts and deletes | ||
* will write through to all blockstores. Has and get will | ||
* try each store sequentially. getAll will use every store but also | ||
* deduplicate any yielded pairs. | ||
*/ | ||
export class TieredBlockstore extends BaseBlockstore { | ||
private readonly stores: Blockstore[] | ||
|
||
constructor (stores: Blockstore[]) { | ||
super() | ||
|
||
this.stores = stores.slice() | ||
} | ||
|
||
async put (key: CID, value: Uint8Array, options?: AbortOptions): Promise<CID> { | ||
try { | ||
await Promise.all(this.stores.map(async store => { await store.put(key, value, options) })) | ||
return key | ||
} catch (err: any) { | ||
throw Errors.putFailedError(err) | ||
} | ||
} | ||
|
||
async get (key: CID, options?: AbortOptions): Promise<Uint8Array> { | ||
for (const store of this.stores) { | ||
try { | ||
const res = await store.get(key, options) | ||
if (res != null) return res | ||
} catch (err) { | ||
log.error(err) | ||
} | ||
} | ||
throw Errors.notFoundError() | ||
} | ||
|
||
async has (key: CID, options?: AbortOptions): Promise<boolean> { | ||
for (const s of this.stores) { | ||
if (await s.has(key, options)) { | ||
return true | ||
} | ||
} | ||
|
||
return false | ||
} | ||
|
||
async delete (key: CID, options?: AbortOptions): Promise<void> { | ||
try { | ||
await Promise.all(this.stores.map(async store => { await store.delete(key, options) })) | ||
} catch (err: any) { | ||
throw Errors.deleteFailedError(err) | ||
} | ||
} | ||
|
||
async * putMany (source: AwaitIterable<Pair>, options: AbortOptions = {}): AsyncIterable<CID> { | ||
let error: Error | undefined | ||
const pushables = this.stores.map(store => { | ||
const source = pushable<Pair>({ | ||
objectMode: true | ||
}) | ||
|
||
drain(store.putMany(source, options)) | ||
.catch(err => { | ||
// store threw while putting, make sure we bubble the error up | ||
error = err | ||
}) | ||
|
||
return source | ||
}) | ||
|
||
try { | ||
for await (const pair of source) { | ||
if (error != null) { | ||
throw error | ||
} | ||
|
||
pushables.forEach(p => p.push(pair)) | ||
|
||
yield pair.cid | ||
} | ||
} finally { | ||
pushables.forEach(p => p.end()) | ||
} | ||
} | ||
|
||
async * deleteMany (source: AwaitIterable<CID>, options: AbortOptions = {}): AsyncIterable<CID> { | ||
let error: Error | undefined | ||
const pushables = this.stores.map(store => { | ||
const source = pushable<CID>({ | ||
objectMode: true | ||
}) | ||
|
||
drain(store.deleteMany(source, options)) | ||
.catch(err => { | ||
// store threw while deleting, make sure we bubble the error up | ||
error = err | ||
}) | ||
|
||
return source | ||
}) | ||
|
||
try { | ||
for await (const key of source) { | ||
if (error != null) { | ||
throw error | ||
} | ||
|
||
pushables.forEach(p => p.push(key)) | ||
|
||
yield key | ||
} | ||
} finally { | ||
pushables.forEach(p => p.end()) | ||
} | ||
} | ||
|
||
async * getAll (options?: AbortOptions): AwaitIterable<Pair> { // eslint-disable-line require-yield | ||
// deduplicate yielded pairs | ||
const seen = new Set<string>() | ||
|
||
yield * filter(merge(...this.stores.map(s => s.getAll(options))), (pair) => { | ||
const cidStr = pair.cid.toString() | ||
|
||
if (seen.has(cidStr)) { | ||
return false | ||
} | ||
|
||
seen.add(cidStr) | ||
|
||
return true | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* eslint-env mocha */ | ||
|
||
import { expect } from 'aegir/chai' | ||
import { interfaceBlockstoreTests } from 'interface-blockstore-tests' | ||
import { CID } from 'multiformats/cid' | ||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' | ||
import { MemoryBlockstore } from '../src/memory.js' | ||
import { TieredBlockstore } from '../src/tiered.js' | ||
import type { Blockstore } from 'interface-blockstore' | ||
|
||
describe('Tiered', () => { | ||
describe('all stores', () => { | ||
const ms: Blockstore[] = [] | ||
let store: TieredBlockstore | ||
beforeEach(() => { | ||
ms.push(new MemoryBlockstore()) | ||
ms.push(new MemoryBlockstore()) | ||
store = new TieredBlockstore(ms) | ||
}) | ||
|
||
it('put', async () => { | ||
const k = CID.parse('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL') | ||
const v = uint8ArrayFromString('world') | ||
await store.put(k, v) | ||
const res = await Promise.all([ms[0].get(k), ms[1].get(k)]) | ||
res.forEach((val) => { | ||
expect(val).to.be.eql(v) | ||
}) | ||
}) | ||
|
||
it('get and has, where available', async () => { | ||
const k = CID.parse('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL') | ||
const v = uint8ArrayFromString('world') | ||
await ms[1].put(k, v) | ||
const val = await store.get(k) | ||
expect(val).to.be.eql(v) | ||
const exists = await store.has(k) | ||
expect(exists).to.be.eql(true) | ||
}) | ||
|
||
it('has - key not found', async () => { | ||
expect(await store.has(CID.parse('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnA'))).to.be.eql(false) | ||
}) | ||
|
||
it('has and delete', async () => { | ||
const k = CID.parse('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL') | ||
const v = uint8ArrayFromString('world') | ||
await store.put(k, v) | ||
let res = await Promise.all([ms[0].has(k), ms[1].has(k)]) | ||
expect(res).to.be.eql([true, true]) | ||
await store.delete(k) | ||
res = await Promise.all([ms[0].has(k), ms[1].has(k)]) | ||
expect(res).to.be.eql([false, false]) | ||
}) | ||
}) | ||
|
||
describe('inteface-datastore-single', () => { | ||
interfaceBlockstoreTests({ | ||
setup () { | ||
return new TieredBlockstore([ | ||
new MemoryBlockstore(), | ||
new MemoryBlockstore() | ||
]) | ||
}, | ||
teardown () { } | ||
}) | ||
}) | ||
}) |