Skip to content

Commit

Permalink
feat: realtime event
Browse files Browse the repository at this point in the history
  • Loading branch information
nichenqin committed May 29, 2024
1 parent efbed0b commit e40727e
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 37 deletions.
18 changes: 14 additions & 4 deletions apps/backend/src/routes/realtime.route.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import cron, { Patterns } from "@elysiajs/cron"
import Stream from "@elysiajs/stream"
import { inject, singleton } from "@undb/di"
import { BaseEvent } from "@undb/domain"
import { PubSubContext, ReplyService, RxJSPubSub } from "@undb/realtime"
import Elysia from "elysia"
import Elysia, { t } from "elysia"

@singleton()
export class RealtimeRoute {
constructor(
@inject(PubSubContext)
private readonly pubsub: PubSubContext,
private readonly pubsub: PubSubContext<BaseEvent>,
@inject(RxJSPubSub)
rxjsPubSub: RxJSPubSub,
rxjsPubSub: RxJSPubSub<BaseEvent>,
@inject(ReplyService)
private readonly reply: ReplyService,
) {
Expand All @@ -28,6 +29,15 @@ export class RealtimeRoute {
},
}),
)
.get("/sse", () => new Stream(this.pubsub.subscribe("record.*")))
.get(
"/api/tables/:tableId/subscription",
(ctx) => {
const tableId = ctx.params.tableId
return new Stream(this.pubsub.subscribe(`tenant.${tableId}.record.*`))
},
{
params: t.Object({ tableId: t.String() }),
},
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
</script>

<ScrollArea class="h-full w-full bg-gray-100 p-6 shadow-inner">
<div class="bg-background mx-auto max-w-[600px] space-y-2 rounded-md px-8 py-4 shadow-md" data-form-id={form.id}>
<div class="bg-background mx-auto max-w-[660px] space-y-2 rounded-md px-8 py-4 shadow-md" data-form-id={form.id}>
{#if isEditingFormName}
<input
class="text-4xl font-extrabold tracking-tight"
Expand Down
Binary file modified bun.lockb
Binary file not shown.
2 changes: 1 addition & 1 deletion packages/persistence/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ export type { Database } from "./db"
export { injectDb } from "./db.provider"
export * from "./record"
export * from "./table"
export { sessionTable, tables, users, outbox } from "./tables"
export * from "./tables"

export * from "./uow"
2 changes: 1 addition & 1 deletion packages/persistence/src/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const outbox = sqliteTable("outbox", {
id: text("id").notNull().primaryKey(),
payload: text("payload", { mode: "json" }).notNull(),
meta: text("meta", { mode: "json" }),
timestamp: integer("timestamp", { mode: "timestamp" }).notNull(),
timestamp: integer("timestamp", { mode: "timestamp_ms" }).notNull(),
operatorId: text("operator_id").notNull(),
name: text("name").notNull(),
})
Expand Down
3 changes: 2 additions & 1 deletion packages/realtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"@undb/di": "workspace:*",
"@undb/persistence": "workspace:*",
"@undb/table": "workspace:*",
"drizzle-orm": "^0.30.10"
"drizzle-orm": "^0.30.10",
"ts-pattern": "^5.1.2"
},
"devDependencies": {
"@types/bun": "latest"
Expand Down
23 changes: 12 additions & 11 deletions packages/realtime/src/pubsub/pubsub.context.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
import { singleton } from "@undb/di"
import type { Topic } from "../reply/topic"

export interface Message {
topic: string
message: string
export interface Message<T> {
topic: Topic
message: T
}

export interface PubSub {
publish(topic: string, message: string): void
subscribe(topic: string): AsyncIterable<string>
export interface PubSub<T> {
publish(topic: Topic, message: T): void
subscribe(topic: Topic): AsyncIterable<T>
}

@singleton()
export class PubSubContext {
private pubSub!: PubSub
export class PubSubContext<T> {
private pubSub!: PubSub<T>

setPubSub(pubSub: PubSub): void {
setPubSub(pubSub: PubSub<T>): void {
this.pubSub = pubSub
}

publish(topic: string, message: string): void {
publish(topic: Topic, message: T): void {
this.pubSub.publish(topic, message)
}

subscribe(topic: string): AsyncIterable<string> {
subscribe(topic: Topic): AsyncIterable<T> {
return this.pubSub.subscribe(topic)
}
}
17 changes: 9 additions & 8 deletions packages/realtime/src/pubsub/rxjs.pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,30 @@ import { Subject, filter, map, type Observable } from "rxjs"
import { type Message, type PubSub } from "./pubsub.context"
import { singleton } from "@undb/di"
import { Glob } from "bun"
import type { Topic } from "../reply/topic"

@singleton()
export class RxJSPubSub implements PubSub {
private subject: Subject<Message>
export class RxJSPubSub<T> implements PubSub<T> {
private subject: Subject<Message<T>>

constructor() {
this.subject = new Subject<Message>()
this.subject = new Subject<Message<T>>()
}

publish(topic: string, message: string): void {
publish(topic: Topic, message: T): void {
this.subject.next({ topic, message })
}

subscribe(topic: string): AsyncIterable<string> {
subscribe(topic: Topic): AsyncIterable<T> {
const observable = this.subject.asObservable().pipe(
filter((msg: Message) => this.matchTopic(topic, msg)),
map((msg: Message) => msg.message),
filter((msg: Message<T>) => this.matchTopic(topic, msg)),
map((msg: Message<T>) => msg.message),
)

return this.toAsyncIterable(observable)
}

private matchTopic(topic: string, msg: Message) {
private matchTopic(topic: string, msg: Message<T>) {
return topic === "*" || new Glob(topic).match(msg.topic)
}

Expand Down
10 changes: 10 additions & 0 deletions packages/realtime/src/reply/reply-event.factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import type { BaseEvent, Option, IEventJSON } from "@undb/domain"
import type { Outbox } from "@undb/persistence"
import { RecordEventFactory } from "@undb/table"

export class ReplyEventFactory {
static from(outbox: Outbox): Option<BaseEvent> {
// TODO: just use date time timestamp
return RecordEventFactory.fromJSON({ ...outbox, timestamp: outbox.timestamp.toISOString() } as IEventJSON)
}
}
15 changes: 13 additions & 2 deletions packages/realtime/src/reply/reply.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,32 @@ import { injectDb, type Database } from "@undb/persistence"
import { outbox } from "@undb/persistence"
import { PubSubContext } from "../pubsub/pubsub.context"
import { inArray } from "drizzle-orm"
import { ReplyEventFactory } from "./reply-event.factory"
import type { BaseEvent } from "@undb/domain"
import { getTopic } from "./topic"

@singleton()
export class ReplyService {
constructor(
@injectDb()
private readonly db: Database,
@inject(PubSubContext)
private readonly pubsub: PubSubContext,
private readonly pubsub: PubSubContext<BaseEvent>,
) {}

public async scan() {
const outboxList = await this.db.select().from(outbox).limit(10)

for (const item of outboxList) {
this.pubsub.publish(item.name, JSON.stringify(item))
const event = ReplyEventFactory.from(item)
if (event.isNone()) continue

const evt = event.unwrap()

const topic = getTopic(evt)
if (topic.isNone()) continue

this.pubsub.publish(topic.unwrap(), evt)
}

if (outboxList.length > 0) {
Expand Down
15 changes: 15 additions & 0 deletions packages/realtime/src/reply/topic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Option, type BaseEvent } from "@undb/domain"
import { RecordCreatedEvent, RecordDeletedEvent } from "@undb/table"
import { match, P } from "ts-pattern"

export type Topic = `tenant.${string}.${string}`

export const getTopic = (event: BaseEvent): Option<Topic> => {
const topic = match(event)
.returnType<Topic | null>()
.with(P.instanceOf(RecordCreatedEvent), (e) => `tenant.${e.payload.tableId}.record.created`)
.with(P.instanceOf(RecordDeletedEvent), (e) => `tenant.${e.payload.tableId}.record.deleted`)
.otherwise(() => null)

return Option(topic)
}
2 changes: 2 additions & 0 deletions packages/table/src/modules/records/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export * from "./record-deleted.event"
export type IRecordEvent = RecordDeletedEvent | RecordCreatedEvent

export const RecordEvents = [RecordDeletedEvent, RecordCreatedEvent]

export * from "./record-event.factory"
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ export type IRecordCreatedEvent = z.infer<typeof recordCreatedEvent>
export class RecordCreatedEvent extends BaseEvent<IRecordCreatedEvent, typeof RECORD_CREATED_EVENT> {
name = RECORD_CREATED_EVENT

constructor(table: TableDo, record: RecordDO) {
super(
static create(table: TableDo, record: RecordDO) {
return new this(
{
id: record.id.value,
tableId: table.id.value,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BaseEvent } from "@undb/domain"
import { BaseEvent, type IEventJSON } from "@undb/domain"
import { z } from "@undb/zod"
import { tableId } from "../../../table-id.vo"
import type { TableDo } from "../../../table.do"
Expand All @@ -16,8 +16,8 @@ export type IRecordDeletedEvent = z.infer<typeof recordDeletedEvent>
export class RecordDeletedEvent extends BaseEvent<IRecordDeletedEvent, typeof RECORD_DELETED_EVENT> {
name = RECORD_DELETED_EVENT

constructor(table: TableDo, record: RecordDO) {
super(
static create(table: TableDo, record: RecordDO) {
return new this(
{
id: record.id.value,
tableId: table.id.value,
Expand Down
29 changes: 29 additions & 0 deletions packages/table/src/modules/records/events/record-event.factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Option, type IEventJSON } from "@undb/domain"
import {
RecordCreatedEvent,
RecordDeletedEvent,
type IRecordCreatedEvent,
type IRecordDeletedEvent,
type IRecordEvent,
} from "."
import { match } from "ts-pattern"

export class RecordEventFactory {
static fromJSON(event: IEventJSON): Option<IRecordEvent> {
const evt = match(event)
.returnType<IRecordEvent | null>()
.with(
{ name: "record.created" },
(event) =>
new RecordCreatedEvent(event.payload as IRecordCreatedEvent, event.meta, event.id, new Date(event.timestamp)),
)
.with(
{ name: "record.deleted" },
(event) =>
new RecordDeletedEvent(event.payload as IRecordDeletedEvent, event.meta, event.id, new Date(event.timestamp)),
)
.otherwise(() => null)

return Option(evt)
}
}
6 changes: 3 additions & 3 deletions packages/table/src/modules/records/record/record.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class RecordDO extends AggregateRoot<IRecordEvent> {
static create(table: TableDo, dto: ICreateRecordDTO) {
const record = new RecordDO(RecordIdVO.create(dto.id), RecordValuesVO.create(table, dto.values))

const event = new RecordCreatedEvent(table, record)
const event = RecordCreatedEvent.create(table, record)
record.addDomainEvent(event)

return record
Expand Down Expand Up @@ -52,7 +52,7 @@ export class RecordDO extends AggregateRoot<IRecordEvent> {

duplicate(table: TableDo): RecordDO {
const record = new RecordDO(RecordIdVO.create(), this.values.duplicate(table.schema.fieldMapById))
record.addDomainEvent(new RecordCreatedEvent(table, record))
record.addDomainEvent(RecordCreatedEvent.create(table, record))
return record
}

Expand Down Expand Up @@ -84,7 +84,7 @@ export class RecordDO extends AggregateRoot<IRecordEvent> {
}

delete(table: TableDo) {
const event = new RecordDeletedEvent(table, this)
const event = RecordDeletedEvent.create(table, this)
this.addDomainEvent(event)
}

Expand Down

0 comments on commit e40727e

Please sign in to comment.