Skip to content

Commit

Permalink
reset?
Browse files Browse the repository at this point in the history
  • Loading branch information
ckousik committed Oct 9, 2022
1 parent 559c292 commit 894e6a8
Showing 1 changed file with 5 additions and 84 deletions.
89 changes: 5 additions & 84 deletions p2p/transport/webrtc/datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,87 +76,14 @@ func newDataChannel(
return result
}

// func (d *dataChannel) handleMessage(msg webrtc.DataChannelMessage) {
// if msg.IsString {
// log.Warnf("received string message")
// return
// }

// var pbmsg pb.Message
// if err := pbmsg.Unmarshal(msg.Data); err != nil {
// log.Warnf("could not unmarshal protobuf message")
// return
// }

// if !d.isRemoteWriteClosed() && !d.isLocalReadClosed() {
// d.m.Lock()
// d.readBuf.Write(pbmsg.GetMessage())
// // n, err := d.readBuf.Write(pbmsg.GetMessage())
// // log.Warnf("wrote %d bytes to buffer, msg size: %d: %v", n, len(pbmsg.GetMessage()), err)
// d.m.Unlock()
// select {
// case d.readSignal <- struct{}{}:
// default:
// }
// }

// if pbmsg.Flag != nil {
// switch pbmsg.GetFlag() {
// case pb.Message_FIN:
// atomic.StoreUint32(&d.remoteWriteClosed, 1)
// select {
// case <-d.readSignal:
// default:
// close(d.readSignal)
// }

// case pb.Message_STOP_SENDING:
// atomic.StoreUint32(&d.remoteReadClosed, 1)
// case pb.Message_RESET:
// log.Errorf("remote reset")
// d.Close()
// }
// }

// }

// func (d *dataChannel) Read(b []byte) (int, error) {
// for {
// select {
// case <-d.readDeadline.wait():
// return 0, os.ErrDeadlineExceeded
// default:
// }

// d.m.Lock()
// read, err := d.readBuf.Read(b)
// d.m.Unlock()
// if err == io.EOF && d.isRemoteWriteClosed() {
// return read, io.EOF
// }
// // log.Warnf("read %d bytes: %s", read, string(b[:read]))
// if read > 0 {
// return read, nil
// }

// // log.Warnf("waiting for read")
// select {
// case <-d.readSignal:
// case <-d.ctx.Done():
// return 0, d.ctx.Err()
// case <-d.readDeadline.wait():
// return 0, os.ErrDeadlineExceeded
// }
// }
// }

func (d *dataChannel) processControlMessage(msg pb.Message) {
switch msg.GetFlag() {
case pb.Message_FIN:
atomic.StoreUint32(&d.remoteWriteClosed, 1)
case pb.Message_STOP_SENDING:
atomic.StoreUint32(&d.remoteReadClosed, 1)
// TODO: Process reset
case pb.Message_RESET:
atomic.StoreUint32(&d.remoteWriteClosed, 1)
}
}

Expand Down Expand Up @@ -262,16 +189,10 @@ func (d *dataChannel) RemoteAddr() net.Addr {
func (d *dataChannel) Reset() error {
var err error
d.resetOnce.Do(func() {
// does reset mean that no more data will be sent?
atomic.StoreUint32(&d.localWriteClosed, 1)
msg := &pb.Message{Flag: pb.Message_RESET.Enum()}
data, err := msg.Marshal()
if err != nil {
return
}
err = d.channel.Send(data)
if err != nil {
return
}
d.channel.Close()
err = d.writer.WriteMsg(msg)
})
return err
}
Expand Down

0 comments on commit 894e6a8

Please sign in to comment.