diff --git a/p2p/transport/webrtc/datachannel.go b/p2p/transport/webrtc/datachannel.go index e2900d1e69..050e74aea7 100644 --- a/p2p/transport/webrtc/datachannel.go +++ b/p2p/transport/webrtc/datachannel.go @@ -76,87 +76,14 @@ func newDataChannel( return result } -// func (d *dataChannel) handleMessage(msg webrtc.DataChannelMessage) { -// if msg.IsString { -// log.Warnf("received string message") -// return -// } - -// var pbmsg pb.Message -// if err := pbmsg.Unmarshal(msg.Data); err != nil { -// log.Warnf("could not unmarshal protobuf message") -// return -// } - -// if !d.isRemoteWriteClosed() && !d.isLocalReadClosed() { -// d.m.Lock() -// d.readBuf.Write(pbmsg.GetMessage()) -// // n, err := d.readBuf.Write(pbmsg.GetMessage()) -// // log.Warnf("wrote %d bytes to buffer, msg size: %d: %v", n, len(pbmsg.GetMessage()), err) -// d.m.Unlock() -// select { -// case d.readSignal <- struct{}{}: -// default: -// } -// } - -// if pbmsg.Flag != nil { -// switch pbmsg.GetFlag() { -// case pb.Message_FIN: -// atomic.StoreUint32(&d.remoteWriteClosed, 1) -// select { -// case <-d.readSignal: -// default: -// close(d.readSignal) -// } - -// case pb.Message_STOP_SENDING: -// atomic.StoreUint32(&d.remoteReadClosed, 1) -// case pb.Message_RESET: -// log.Errorf("remote reset") -// d.Close() -// } -// } - -// } - -// func (d *dataChannel) Read(b []byte) (int, error) { -// for { -// select { -// case <-d.readDeadline.wait(): -// return 0, os.ErrDeadlineExceeded -// default: -// } - -// d.m.Lock() -// read, err := d.readBuf.Read(b) -// d.m.Unlock() -// if err == io.EOF && d.isRemoteWriteClosed() { -// return read, io.EOF -// } -// // log.Warnf("read %d bytes: %s", read, string(b[:read])) -// if read > 0 { -// return read, nil -// } - -// // log.Warnf("waiting for read") -// select { -// case <-d.readSignal: -// case <-d.ctx.Done(): -// return 0, d.ctx.Err() -// case <-d.readDeadline.wait(): -// return 0, os.ErrDeadlineExceeded -// } -// } -// } - func (d *dataChannel) processControlMessage(msg pb.Message) { switch msg.GetFlag() { case pb.Message_FIN: atomic.StoreUint32(&d.remoteWriteClosed, 1) case pb.Message_STOP_SENDING: atomic.StoreUint32(&d.remoteReadClosed, 1) - // TODO: Process reset + case pb.Message_RESET: + atomic.StoreUint32(&d.remoteWriteClosed, 1) } } @@ -262,16 +189,10 @@ func (d *dataChannel) RemoteAddr() net.Addr { func (d *dataChannel) Reset() error { var err error d.resetOnce.Do(func() { + // does reset mean that no more data will be sent? + atomic.StoreUint32(&d.localWriteClosed, 1) msg := &pb.Message{Flag: pb.Message_RESET.Enum()} - data, err := msg.Marshal() - if err != nil { - return - } - err = d.channel.Send(data) - if err != nil { - return - } - d.channel.Close() + err = d.writer.WriteMsg(msg) }) return err }