Skip to content

Commit

Permalink
fix: add sharding to s3 blockstore (#202)
Browse files Browse the repository at this point in the history
To avoid non-obvious performance footguns shard by default
  • Loading branch information
achingbrain authored Mar 24, 2023
1 parent 405250f commit e1324a1
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 83 deletions.
49 changes: 12 additions & 37 deletions packages/blockstore-s3/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,29 @@ import {
DeleteObjectCommand,
ListObjectsV2Command
} from '@aws-sdk/client-s3'
import { CID } from 'multiformats/cid'
import { base32upper } from 'multiformats/bases/base32'
import type { MultibaseCodec } from 'multiformats/bases/interface'
import type { CID } from 'multiformats/cid'
import { NextToLast, ShardingStrategy } from './sharding.js'

export interface S3DatastoreInit {
/**
* An optional path to use within the bucket for all files - this setting can
* affect S3 performance as it does internal sharding based on 'prefixes' -
* these can be delimited by '/' so it's often better to wrap this datastore in
* a sharding datastore which will generate prefixed datastore keys for you.
*
* See - https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
* and https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html
*/
path?: string

/**
* Whether to try to create the bucket if it is missing when `.open` is called
*/
createIfMissing?: boolean

/**
* The multibase codec to use - nb. should be case insensitive.
* default: base32upper
* Control how CIDs map to paths and back
*/
base?: MultibaseCodec<string>
shardingStrategy?: ShardingStrategy
}

/**
* A blockstore backed by AWS S3
*/
export class S3Blockstore extends BaseBlockstore {
public path?: string
public createIfMissing: boolean
private readonly s3: S3
private readonly bucket: string
private readonly base: MultibaseCodec<string>
private readonly shardingStrategy: ShardingStrategy

constructor (s3: S3, bucket: string, init?: S3DatastoreInit) {
super()
Expand All @@ -62,21 +48,10 @@ export class S3Blockstore extends BaseBlockstore {
throw new Error('An bucket must be supplied. See the datastore-s3 README for examples.')
}

this.path = init?.path
this.s3 = s3
this.bucket = bucket
this.createIfMissing = init?.createIfMissing ?? false
this.base = init?.base ?? base32upper
}

/**
* Returns the full key which includes the path to the ipfs store
*/
_getFullKey (cid: CID): string {
// Avoid absolute paths with s3
const str = this.base.encoder.encode(cid.multihash.bytes)

return [this.path, str].filter(Boolean).join('/').replace(/\/\/+/g, '/')
this.shardingStrategy = init?.shardingStrategy ?? new NextToLast()
}

/**
Expand All @@ -88,7 +63,7 @@ export class S3Blockstore extends BaseBlockstore {
await this.s3.send(
new PutObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key),
Key: this.shardingStrategy.encode(key),
Body: val
}), {
abortSignal: options?.signal
Expand All @@ -110,7 +85,7 @@ export class S3Blockstore extends BaseBlockstore {
const data = await this.s3.send(
new GetObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key)
Key: this.shardingStrategy.encode(key)
}), {
abortSignal: options?.signal
}
Expand Down Expand Up @@ -154,7 +129,7 @@ export class S3Blockstore extends BaseBlockstore {
await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key)
Key: this.shardingStrategy.encode(key)
}), {
abortSignal: options?.signal
}
Expand Down Expand Up @@ -185,7 +160,7 @@ export class S3Blockstore extends BaseBlockstore {
await this.s3.send(
new DeleteObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key)
Key: this.shardingStrategy.encode(key)
}), {
abortSignal: options?.signal
}
Expand Down Expand Up @@ -224,7 +199,7 @@ export class S3Blockstore extends BaseBlockstore {
}

// Remove the path from the key
const cid = CID.decode(this.base.decoder.decode(d.Key.slice((this.path ?? '').length)))
const cid = this.shardingStrategy.decode(d.Key)

yield {
cid,
Expand Down Expand Up @@ -257,7 +232,7 @@ export class S3Blockstore extends BaseBlockstore {
await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: this.path ?? ''
Key: ''
}), {
abortSignal: options?.signal
}
Expand Down
118 changes: 118 additions & 0 deletions packages/blockstore-s3/src/sharding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { CID } from 'multiformats/cid'
import { base32upper } from 'multiformats/bases/base32'
import type { MultibaseCodec } from 'multiformats/bases/interface'

export interface ShardingStrategy {
extension: string
encode: (cid: CID) => string
decode: (path: string) => CID
}

export interface NextToLastInit {
/**
* The file extension to use. default: '.data'
*/
extension?: string

/**
* How many characters to take from the end of the CID. default: 2
*/
prefixLength?: number

/**
* The multibase codec to use - nb. should be case insensitive.
* default: base32upper
*/
base?: MultibaseCodec<string>
}

/**
* A sharding strategy that takes the last few characters of a multibase encoded
* CID and uses them as the directory to store the block in. This prevents
* storing all blocks in a single directory which would overwhelm most
* filesystems.
*/
export class NextToLast implements ShardingStrategy {
public extension: string
private readonly prefixLength: number
private readonly base: MultibaseCodec<string>

constructor (init: NextToLastInit = {}) {
this.extension = init.extension ?? '.data'
this.prefixLength = init.prefixLength ?? 2
this.base = init.base ?? base32upper
}

encode (cid: CID): string {
const str = this.base.encoder.encode(cid.multihash.bytes)
const prefix = str.substring(str.length - this.prefixLength)

return `${prefix}/${str}${this.extension}`
}

decode (str: string): CID {
let fileName = str.split('/').pop()

if (fileName == null) {
throw new Error('Invalid path')
}

if (fileName.endsWith(this.extension)) {
fileName = fileName.substring(0, fileName.length - this.extension.length)
}

return CID.decode(this.base.decoder.decode(fileName))
}
}

export interface FlatDirectoryInit {
/**
* The file extension to use. default: '.data'
*/
extension?: string

/**
* How many characters to take from the end of the CID. default: 2
*/
prefixLength?: number

/**
* The multibase codec to use - nb. should be case insensitive.
* default: base32padupper
*/
base?: MultibaseCodec<string>
}

/**
* A sharding strategy that does not do any sharding and stores all files
* in one directory. Only for testing, do not use in production.
*/
export class FlatDirectory implements ShardingStrategy {
public extension: string
private readonly base: MultibaseCodec<string>

constructor (init: NextToLastInit = {}) {
this.extension = init.extension ?? '.data'
this.base = init.base ?? base32upper
}

encode (cid: CID): string {
const str = this.base.encoder.encode(cid.multihash.bytes)

return `${str}${this.extension}`
}

decode (str: string): CID {
let fileName = str.split('/').pop()

if (fileName == null) {
throw new Error('Invalid path')
}

if (fileName.endsWith(this.extension)) {
fileName = fileName.substring(0, fileName.length - this.extension.length)
}

return CID.decode(this.base.decoder.decode(fileName))
}
}
47 changes: 1 addition & 46 deletions packages/blockstore-s3/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { CreateBucketCommand, PutObjectCommand, HeadObjectCommand, S3, GetObjectCommand } from '@aws-sdk/client-s3'
import { CreateBucketCommand, HeadObjectCommand, S3 } from '@aws-sdk/client-s3'
import defer from 'p-defer'
import { interfaceBlockstoreTests } from 'interface-blockstore-tests'
import { CID } from 'multiformats/cid'
Expand Down Expand Up @@ -43,25 +43,6 @@ describe('S3Blockstore', () => {
})

describe('put', () => {
it('should include the path in the key', async () => {
const s3 = new S3({ region: 'REGION' })
const store = new S3Blockstore(s3, 'test', {
path: '.ipfs/datastore'
})

const deferred = defer<PutObjectCommand>()

sinon.replace(s3, 'send', (command: PutObjectCommand) => {
deferred.resolve(command)
return s3Resolve(null)
})

await store.put(cid, new TextEncoder().encode('test data'))

const command = await deferred.promise
expect(command).to.have.nested.property('input.Key', '.ipfs/datastore/BCIQPGZJ6QLZOFG3OP45NLMSJUWGJCO72QQKHLDTB6FXIB6BDSLRQYLY')
})

it('should return a standard error when the put fails', async () => {
const s3 = new S3({ region: 'REGION' })
const store = new S3Blockstore(s3, 'test')
Expand All @@ -80,32 +61,6 @@ describe('S3Blockstore', () => {
})

describe('get', () => {
it('should include the path in the fetch key', async () => {
const s3 = new S3({ region: 'REGION' })
const store = new S3Blockstore(s3, 'test', {
path: '.ipfs/datastore'
})
const buf = new TextEncoder().encode('test')

const deferred = defer<GetObjectCommand>()

sinon.replace(s3, 'send', (command: any) => {
if (command.constructor.name === 'GetObjectCommand') {
deferred.resolve(command)
return s3Resolve({ Body: buf })
}

return s3Reject(new S3Error('UnknownCommand'))
})

const value = await store.get(cid)

expect(value).to.equalBytes(buf)

const getObjectCommand = await deferred.promise
expect(getObjectCommand).to.have.nested.property('input.Key', '.ipfs/datastore/BCIQPGZJ6QLZOFG3OP45NLMSJUWGJCO72QQKHLDTB6FXIB6BDSLRQYLY')
})

it('should return a standard not found error code if the key isn\'t found', async () => {
const s3 = new S3({ region: 'REGION' })
const store = new S3Blockstore(s3, 'test')
Expand Down

0 comments on commit e1324a1

Please sign in to comment.