Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Accouting Service (WIP) #428

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions billing/data/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { DecodeFailure, EncodeFailure, Schema } from './lib.js'

/**
* @typedef {import('../lib/api').EgressEvent} EgressEvent
* @typedef {import('../types').InferStoreRecord<EgressEvent> & { pk: string, sk: string }} EgressEventStoreRecord
* @typedef {import('../types').StoreRecord} StoreRecord
* @typedef {import('../lib/api').EgressEventListKey} EgressEventListKey
* @typedef {{ pk: string, sk: string }} EgressEventListStoreRecord
*/

export const egressSchema = Schema.struct({
customerId: Schema.did({ method: 'mailto' }),
resourceId: Schema.text(),
timestamp: Schema.date(),
})

/** @type {import('../lib/api').Validator<EgressEvent>} */
export const validate = input => egressSchema.read(input)

/** @type {import('../lib/api').Encoder<EgressEvent, EgressEventStoreRecord>} */
export const encode = input => {
try {
return {
ok: {
pk: `${input.timestamp.toISOString()}#${input.customerId}`,
sk: `${input.timestamp.toISOString()}#${input.customerId}#${input.resourceId}`,
customerId: input.customerId,
resourceId: input.resourceId,
timestamp: input.timestamp.toISOString(),
}
}
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Encoder<EgressEvent, string>} */
export const encodeStr = input => {
try {
const data = encode(input)
if (data.error) throw data.error
return { ok: JSON.stringify(data.ok) }
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding string egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<StoreRecord, EgressEvent>} */
export const decode = input => {
try {
return {
ok: {
customerId: Schema.did({ method: 'mailto' }).from(input.customerId),
resourceId: /** @type {string} */ (input.resourceId),
timestamp: new Date(input.timestamp),
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding egress event: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api').Decoder<string, EgressEvent>} */
export const decodeStr = input => {
const data = decode(JSON.parse(input))
if (data.error) throw data.error
return { ok: data.ok }
}

export const lister = {
/** @type {import('../lib/api').Encoder<EgressEventListKey, EgressEventListStoreRecord>} */
encodeKey: input => ({
ok: {
pk: `${input.from.toISOString()}#${input.customerId}`,
sk: `${input.from.toISOString()}#${input.customerId}#${input.resourceId}`
}
})
}
111 changes: 111 additions & 0 deletions billing/functions/egress-traffic-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import * as Sentry from '@sentry/serverless'
import { Config } from 'sst/node/config'
import { expect } from './lib.js'
import { createEgressEventStore } from '../tables/egress.js'
import { decode } from '../data/egress.js'
import { SQSClient, DeleteMessageCommand } from '@aws-sdk/client-sqs'
import { mustGetEnv } from '../../lib/env.js'
import Stripe from 'stripe'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0
})

/**
* @typedef {{
* egressTable?: string
* queueUrl?: string
* region?: 'us-west-2'|'us-east-2'
* stripeSecretKey?: string
* }} CustomHandlerContext
*/

/**
* AWS Lambda handler to process egress events from the egress traffic queue.
* Each event is a JSON object with a `customerId`, `resourceId` and `timestamp`.
* The event is decoded and stored in the egress event table.
* The message is then deleted from the queue when successful.
*/
export const handler = Sentry.AWSLambda.wrapHandler(
/**
* @param {import('aws-lambda').SQSEvent} event
* @param {import('aws-lambda').Context} context
*/
async (event, context) => {
/** @type {CustomHandlerContext|undefined} */
const customContext = context?.clientContext?.Custom
const region = customContext?.region ?? mustGetEnv('AWS_REGION')
const egressTable = customContext?.egressTable ?? mustGetEnv('EGRESS_TABLE_NAME')
const queueUrl = customContext?.queueUrl ?? mustGetEnv('EGRESS_TRAFFIC_QUEUE_URL')
const sqsClient = new SQSClient({ region })
const egressEventStore = createEgressEventStore({ region }, { tableName: egressTable })
const stripeSecretKey = customContext?.stripeSecretKey ?? Config.STRIPE_SECRET_KEY

if (!stripeSecretKey) throw new Error('missing secret: STRIPE_SECRET_KEY')
const stripe = new Stripe(stripeSecretKey, { apiVersion: '2023-10-16' })

for (const record of event.Records) {
try {
const messageBody = JSON.parse(record.body)
const decoded = decode(messageBody)
const egressEvent = expect(decoded, 'Failed to decode egress message')

expect(
await egressEventStore.put(egressEvent),
`Failed to store egress event for customerId: ${egressEvent.customerId}, resourceId: ${egressEvent.resourceId}, timestamp: ${egressEvent.timestamp.toISOString()}`
)

expect(
await sendRecordUsageToStripe(stripe, egressEvent),
`Failed to send record usage to Stripe for customerId: ${egressEvent.customerId}, resourceId: ${egressEvent.resourceId}, timestamp: ${egressEvent.timestamp.toISOString()}`
)

/**
* SQS requires explicit acknowledgment that a message has been successfully processed.
* This is done by deleting the message from the queue using its ReceiptHandle
*/
await sqsClient.send(new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: record.receiptHandle
}))
} catch (error) {
console.error('Error processing egress event:', error)
}
}

return {
statusCode: 200,
body: 'Egress events processed successfully'
}
})

/**
* Sends a record usage to Stripe for a given egress event.
* It uses the Stripe API v2023-10-16 to create a usage record for the given subscription item and quantity.
* The response is checked to ensure the usage record was created successfully.
*
* @param {import('stripe').Stripe} stripe
* @param {import('../data/egress.js').EgressEvent} egressEvent
* @returns {Promise<import('@ucanto/interface').Result<boolean, Error>>}
*/
async function sendRecordUsageToStripe(stripe, egressEvent) {
const subscriptionItem = {
id: 'sub_123', // FIXME (fforbeck):
// Where do we get this from?
// Should be in the event?
// Should we find it in the Stripe API using the customerId?
}
const response = await stripe.subscriptionItems.createUsageRecord(
subscriptionItem.id,
{
quantity: 1, // always 1 for each egress event
timestamp: egressEvent.timestamp.getTime()
}
)
if (response.object === 'usage_record') {
return { ok: true }
}
return { error: new Error('Failed to send record usage to Stripe') }
}
31 changes: 23 additions & 8 deletions billing/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export interface CustomerKey {
customer: CustomerDID
}

export interface CustomerListOptions extends Pageable {}
export interface CustomerListOptions extends Pageable { }

export type CustomerStore =
& StoreGetter<CustomerKey, Customer>
Expand Down Expand Up @@ -133,6 +133,21 @@ export interface UsageListKey { customer: CustomerDID, from: Date }

export type UsageStore = StorePutter<Usage>


/**
* The event that is emitted when egress traffic is detected.
*/
export interface EgressEvent {
customerId: string
resourceId: string
timestamp: Date
}


export interface EgressEventListKey { customerId: string, resourceId: string, from: Date }

export type EgressEventStore = StorePutter<EgressEvent> & StoreLister<EgressEventListKey, EgressEvent>

// Billing queues /////////////////////////////////////////////////////////////

/**
Expand Down Expand Up @@ -188,7 +203,7 @@ export interface ConsumerListKey { consumer: ConsumerDID }

export type ConsumerStore =
& StoreGetter<ConsumerKey, Consumer>
& StoreLister<ConsumerListKey, Pick<Consumer, 'consumer'|'provider'|'subscription'>>
& StoreLister<ConsumerListKey, Pick<Consumer, 'consumer' | 'provider' | 'subscription'>>

export interface Subscription {
customer: CustomerDID
Expand All @@ -205,7 +220,7 @@ export interface SubscriptionListKey { customer: CustomerDID }

export type SubscriptionStore =
& StoreGetter<SubscriptionKey, Subscription>
& StoreLister<SubscriptionListKey, Pick<Subscription, 'customer'|'provider'|'subscription'|'cause'>>
& StoreLister<SubscriptionListKey, Pick<Subscription, 'customer' | 'provider' | 'subscription' | 'cause'>>

// UCAN invocation ////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -302,7 +317,7 @@ export interface InsufficientRecords extends Failure {
/** StorePutter allows a single item to be put in the store by it's key. */
export interface StorePutter<T> {
/** Puts a single item into the store by it's key */
put: (rec: T) => Promise<Result<Unit, EncodeFailure|StoreOperationFailure|Failure>>
put: (rec: T) => Promise<Result<Unit, EncodeFailure | StoreOperationFailure | Failure>>
}

/**
Expand All @@ -316,23 +331,23 @@ export interface StoreBatchPutter<T> {
* not transactional. A failure may mean 1 or more records succeeded to
* be written.
*/
batchPut: (rec: Iterable<T>) => Promise<Result<Unit, InsufficientRecords|EncodeFailure|StoreOperationFailure|Failure>>
batchPut: (rec: Iterable<T>) => Promise<Result<Unit, InsufficientRecords | EncodeFailure | StoreOperationFailure | Failure>>
}

/** StoreGetter allows a single item to be retrieved by it's key. */
export interface StoreGetter<K extends {}, V> {
/** Gets a single item by it's key. */
get: (key: K) => Promise<Result<V, EncodeFailure|RecordNotFound<K>|DecodeFailure|StoreOperationFailure>>
get: (key: K) => Promise<Result<V, EncodeFailure | RecordNotFound<K> | DecodeFailure | StoreOperationFailure>>
}

/** StoreLister allows items in the store to be listed page by page. */
export interface StoreLister<K extends {}, V> {
/** Lists items in the store. */
list: (key: K, options?: Pageable) => Promise<Result<ListSuccess<V>, EncodeFailure|DecodeFailure|StoreOperationFailure>>
list: (key: K, options?: Pageable) => Promise<Result<ListSuccess<V>, EncodeFailure | DecodeFailure | StoreOperationFailure>>
}

/** QueueAdder allows messages to be added to the end of the queue. */
export interface QueueAdder<T> {
/** Adds a message to the end of the queue. */
add: (message: T) => Promise<Result<Unit, EncodeFailure|QueueOperationFailure|Failure>>
add: (message: T) => Promise<Result<Unit, EncodeFailure | QueueOperationFailure | Failure>>
}
9 changes: 9 additions & 0 deletions billing/queues/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { createQueueAdderClient } from './client.js'
import { encodeStr, validate } from '../data/egress.js'

/**
* @param {{ region: string } | import('@aws-sdk/client-sqs').SQSClient} conf
* @param {{ url: URL }} context
*/
export const createEgressEventQueue = (conf, { url }) =>
createQueueAdderClient(conf, { url, encode:encodeStr, validate })
33 changes: 33 additions & 0 deletions billing/tables/egress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { createStorePutterClient, createStoreListerClient } from './client.js'
import { validate, encode, lister, decode } from '../data/egress.js'

/**
* Stores egress events for tracking requests served to customers.
*
* @type {import('sst/constructs').TableProps}
*/
export const egressTableProps = {
fields: {
/** Composite key with format: "customerId" */
pk: 'string',
/** Composite key with format: "timestamp#customerId#resourceId" */
sk: 'string',
/** Customer DID (did:mailto:...). */
customerId: 'string',
/** Resource CID. */
resourceId: 'string',
/** ISO timestamp of the event. */
timestamp: 'string',
},
primaryIndex: { partitionKey: 'pk', sortKey: 'sk' }
}

/**
* @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf
* @param {{ tableName: string }} context
* @returns {import('../lib/api.js').EgressEventStore}
*/
export const createEgressEventStore = (conf, { tableName }) => ({
...createStorePutterClient(conf, { tableName, validate, encode }),
...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode })
})
45 changes: 44 additions & 1 deletion billing/test/helpers/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { encode as encodeSubscription, validate as validateSubscription } from '../../data/subscription.js'
import { encode as encodeConsumer, validate as validateConsumer } from '../../data/consumer.js'
import { decode as decodeUsage, lister as usageLister } from '../../data/usage.js'
import { encode as encodeEgressEvent, decodeStr as decodeEgressEvent, validate as validateEgressEvent } from '../../data/egress.js'

Check failure on line 9 in billing/test/helpers/context.js

View workflow job for this annotation

GitHub Actions / Test

'encodeEgressEvent' is declared but its value is never read.

Check failure on line 9 in billing/test/helpers/context.js

View workflow job for this annotation

GitHub Actions / Test

'validateEgressEvent' is declared but its value is never read.

Check failure on line 9 in billing/test/helpers/context.js

View workflow job for this annotation

GitHub Actions / Test

'encodeEgressEvent' is declared but its value is never read.

Check failure on line 9 in billing/test/helpers/context.js

View workflow job for this annotation

GitHub Actions / Test

'validateEgressEvent' is declared but its value is never read.
import { createCustomerBillingQueue } from '../../queues/customer.js'
import { createSpaceBillingQueue } from '../../queues/space.js'
import { consumerTableProps, subscriptionTableProps } from '../../../upload-api/tables/index.js'
Expand All @@ -16,7 +17,9 @@
import { createSpaceSnapshotStore, spaceSnapshotTableProps } from '../../tables/space-snapshot.js'
import { createUsageStore, usageTableProps } from '../../tables/usage.js'
import { createQueueRemoverClient } from './queue.js'

import { createEgressEventQueue } from '../../queues/egress.js'
import { egressTableProps, createEgressEventStore } from '../../tables/egress.js'
import { handler as createEgressTrafficHandler } from '../../functions/egress-traffic-handler.js'
/**
* @typedef {{
* dynamo: import('./aws').AWSService<import('@aws-sdk/client-dynamodb').DynamoDBClient>
Expand Down Expand Up @@ -137,6 +140,46 @@
return { consumerStore, spaceDiffStore }
}

export const createEgressTrafficQueueTestContext = async () => {
await createAWSServices()

const egressTableName = await createTable(awsServices.dynamo.client, egressTableProps, 'egress-')
const store = createEgressEventStore(awsServices.dynamo.client, { tableName: egressTableName })
const egressEventStore = {
put: store.put,
list: store.list,
}

const egressQueueURL = new URL(await createQueue(awsServices.sqs.client, 'egress-traffic-'))
const egressQueue = {
add: createEgressEventQueue(awsServices.sqs.client, { url: egressQueueURL }).add,
remove: createQueueRemoverClient(awsServices.sqs.client, { url: egressQueueURL, decode: decodeEgressEvent }).remove,
}

return {
egressEventStore,
egressQueue,
egressHandler: createEgressTrafficHandler,
egressTable: egressTableName,
queueUrl: egressQueueURL,
accountId: (await awsServices.dynamo.client.config.credentials()).accountId,
callbackWaitsForEmptyEventLoop: true,
functionName: 'your-function-name',
functionVersion: 'your-function-version',
region: awsServices.dynamo.client.config.region,
invokedFunctionArn: `arn:aws:lambda:${awsServices.dynamo.client.config.region}:${awsServices.dynamo.client.config.credentials().accountId}:function:your-function-name`,

Check failure on line 170 in billing/test/helpers/context.js

View workflow job for this annotation

GitHub Actions / Test

Property 'accountId' does not exist on type 'Promise<AwsCredentialIdentity>'.

Check failure on line 170 in billing/test/helpers/context.js

View workflow job for this annotation

GitHub Actions / Test

Property 'accountId' does not exist on type 'Promise<AwsCredentialIdentity>'.
memoryLimitInMB: '128',
awsRequestId: 'your-request-id',
logGroupName: '/aws/lambda/your-function-name',
logStreamName: 'your-log-stream',
getRemainingTimeInMillis: () => 1000,
done: () => {},
fail: () => {},
succeed: () => {},
stripeSecretKey: "", // FIXME (fforbeck): how to get Stripe secret key in a test?
}
}

/**
* @template C
* @param {import('../lib/api').TestSuite<C>} suite
Expand Down
Loading
Loading