Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #5 from little-bear-labs/ckousik/con-432
Browse files Browse the repository at this point in the history
intitial stream logic
  • Loading branch information
ckousik authored Aug 5, 2022
2 parents 6be6364 + 8defcf8 commit 340e32a
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 38 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.0",
"abortable-iterator": "^4.0.2",
"it-merge": "^1.0.4",
"p-defer": "^4.0.0",
"socket.io-client": "^4.1.2",
"uuid": "^8.3.2"
}
Expand Down
169 changes: 131 additions & 38 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,70 +1,163 @@
import { Stream } from '@libp2p/interface-connection';
import { StreamStat } from '@libp2p/interface-connection';
import { logger } from '@libp2p/logger';
// import { logger } from '@libp2p/logger';
import { Source } from 'it-stream-types';
import { Sink } from 'it-stream-types';
import { pushable, Pushable } from 'it-pushable';
import defer, { DeferredPromise } from 'p-defer';
import merge from 'it-merge';

const log = logger('libp2p:webrtc:connection');
// const log = logger('libp2p:webrtc:connection');

type StreamInitOpts = {
channel: RTCDataChannel;
metadata?: Record<string, any>;
stat: StreamStat;
};

export class WebRTCStream implements Stream {
constructor() {
this.id = 'TODO';
this.stat = {
direction: 'outbound',
timeline: {
open: 0,
close: 0,
/**
* Unique identifier for a stream
*/
id: string;

/**
* Stats about this stream
*/
stat: StreamStat;

/**
* User defined stream metadata
*/
metadata: Record<string, any>;
private readonly channel: RTCDataChannel;

source: Source<Uint8Array> = pushable();
sink: Sink<Uint8Array, Promise<void>>;

// promises
opened: DeferredPromise<void> = defer();
closeWritePromise: DeferredPromise<void> = defer();
writeClosed: boolean = false;
readClosed: boolean = false;
closed: boolean = false;

constructor(opts: StreamInitOpts) {
this.channel = opts.channel;
this.id = this.channel.label;

this.stat = opts.stat;
switch (this.channel.readyState) {
case 'open':
this.opened.resolve();
break;
case 'closed':
case 'closing':
this.closed = true;
if (!this.stat.timeline.close) {
this.stat.timeline.close = new Date().getTime();
}
this.opened.resolve();
break;
}

this.metadata = opts.metadata ?? {};

// closable sink
this.sink = this._sinkFn;

// handle RTCDataChannel events
this.channel.onopen = (_evt) => {
this.stat.timeline.open = new Date().getTime();
this.opened.resolve();
};

this.channel.onmessage = ({ data }) => {
if (this.readClosed || this.closed) {
return;
}
(this.source as Pushable<Uint8Array>).push(data);
};

this.channel.onclose = (_evt) => {
this.close();
};

this.channel.onerror = (evt) => {
let err = (evt as RTCErrorEvent).error;
this.abort(err);
};
}

private async _sinkFn(src: Source<Uint8Array>): Promise<void> {
await this.opened.promise;
if (closed || this.writeClosed) {
return;
}

let self = this;
let closeWriteIterable = {
async *[Symbol.asyncIterator]() {
await self.closeWritePromise.promise;
yield new Uint8Array(0);
},
};
this.metadata = {};
this.sink = (x) => new Promise((res, rej) => {}); //TODO
if (this.dataChannel) {
log('TODO', this.dataChannel.id);

for await (const buf of merge(closeWriteIterable, src)) {
if (closed || this.writeClosed) {
break;
}
this.channel.send(buf);
}
}

/**
* Close a stream for reading and writing
*/
close(): void {}
close(): void {
if (this.closed) {
return;
}
this.stat.timeline.close = new Date().getTime();
this.closed = true;
this.closeRead();
this.closeWrite();
this.channel.close();
}

/**
* Close a stream for reading only
*/
closeRead(): void {}
closeRead(): void {
this.readClosed = true;
(this.source as Pushable<Uint8Array>).end();
if (this.readClosed && this.writeClosed) {
this.close();
}
}

/**
* Close a stream for writing only
*/
closeWrite(): void {}
closeWrite(): void {
this.writeClosed = true;
this.closeWritePromise.resolve();
if (this.readClosed && this.writeClosed) {
this.close();
}
}

/**
* Call when a local error occurs, should close the stream for reading and writing
*/
abort(err: Error): void {}
abort(err: Error): void {
this.close();
}

/**
* Call when a remote error occurs, should close the stream for reading and writing
*/
reset(): void {}

/**
* Unique identifier for a stream
*/
id: string;

/**
* Stats about this stream
*/
stat: StreamStat;

/**
* User defined stream metadata
*/
metadata: Record<string, any>;

source: Source<Uint8Array> = process.stdin; //TODO
sink: Sink<Uint8Array, Promise<void>>;

private dataChannel?: RTCDataChannel;
reset(): void {
this.close();
}
}

0 comments on commit 340e32a

Please sign in to comment.