forked from micro/go-micro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
default.go
1883 lines (1601 loc) · 49.5 KB
/
default.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package network
import (
"errors"
"fmt"
"hash/fnv"
"io"
"math"
"math/rand"
"sort"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/v2/client"
cmucp "github.com/micro/go-micro/v2/client/mucp"
rtr "github.com/micro/go-micro/v2/client/selector/router"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/network/resolver/dns"
pbNet "github.com/micro/go-micro/v2/network/service/proto"
"github.com/micro/go-micro/v2/proxy"
"github.com/micro/go-micro/v2/router"
pbRtr "github.com/micro/go-micro/v2/router/service/proto"
"github.com/micro/go-micro/v2/server"
smucp "github.com/micro/go-micro/v2/server/mucp"
"github.com/micro/go-micro/v2/transport"
"github.com/micro/go-micro/v2/tunnel"
bun "github.com/micro/go-micro/v2/tunnel/broker"
tun "github.com/micro/go-micro/v2/tunnel/transport"
"github.com/micro/go-micro/v2/util/backoff"
pbUtil "github.com/micro/go-micro/v2/util/proto"
)
var (
// NetworkChannel is the name of the tunnel channel for passing network messages
NetworkChannel = "network"
// ControlChannel is the name of the tunnel channel for passing control message
ControlChannel = "control"
// DefaultLink is default network link
DefaultLink = "network"
// MaxConnections is the max number of network client connections
MaxConnections = 3
// MaxPeerErrors is the max number of peer errors before we remove it from network graph
MaxPeerErrors = 3
)
var (
// ErrClientNotFound is returned when client for tunnel channel could not be found
ErrClientNotFound = errors.New("client not found")
// ErrPeerLinkNotFound is returned when peer link could not be found in tunnel Links
ErrPeerLinkNotFound = errors.New("peer link not found")
// ErrPeerMaxExceeded is returned when peer has reached its max error count limit
ErrPeerMaxExceeded = errors.New("peer max errors exceeded")
)
// network implements Network interface
type network struct {
// node is network node
*node
// options configure the network
options Options
// rtr is network router
router router.Router
// proxy is network proxy
proxy proxy.Proxy
// tunnel is network tunnel
tunnel tunnel.Tunnel
// server is network server
server server.Server
// client is network client
client client.Client
// tunClient is a map of tunnel channel clients
tunClient map[string]tunnel.Session
// peerLinks is a map of links for each peer
peerLinks map[string]tunnel.Link
sync.RWMutex
// connected marks the network as connected
connected bool
// closed closes the network
closed chan bool
// whether we've discovered by the network
discovered chan bool
}
// message is network message
type message struct {
// msg is transport message
msg *transport.Message
// session is tunnel session
session tunnel.Session
}
// newNetwork returns a new network node
func newNetwork(opts ...Option) Network {
// create default options
options := DefaultOptions()
// initialize network options
for _, o := range opts {
o(&options)
}
// set the address to a hashed address
hasher := fnv.New64()
hasher.Write([]byte(options.Address + options.Id))
address := fmt.Sprintf("%d", hasher.Sum64())
// set the address to advertise
var advertise string
var peerAddress string
if len(options.Advertise) > 0 {
advertise = options.Advertise
peerAddress = options.Advertise
} else {
advertise = options.Address
peerAddress = address
}
// init tunnel address to the network bind address
options.Tunnel.Init(
tunnel.Address(options.Address),
)
// init router Id to the network id
options.Router.Init(
router.Id(options.Id),
router.Address(peerAddress),
)
// create tunnel client with tunnel transport
tunTransport := tun.NewTransport(
tun.WithTunnel(options.Tunnel),
)
// create the tunnel broker
tunBroker := bun.NewBroker(
bun.WithTunnel(options.Tunnel),
)
// server is network server
server := smucp.NewServer(
server.Id(options.Id),
server.Address(peerAddress),
server.Advertise(advertise),
server.Name(options.Name),
server.Transport(tunTransport),
server.Broker(tunBroker),
)
// client is network client
client := cmucp.NewClient(
client.Broker(tunBroker),
client.Transport(tunTransport),
client.Selector(
rtr.NewSelector(
rtr.WithRouter(options.Router),
),
),
)
network := &network{
node: &node{
id: options.Id,
address: peerAddress,
peers: make(map[string]*node),
status: newStatus(),
},
options: options,
router: options.Router,
proxy: options.Proxy,
tunnel: options.Tunnel,
server: server,
client: client,
tunClient: make(map[string]tunnel.Session),
peerLinks: make(map[string]tunnel.Link),
discovered: make(chan bool, 1),
}
network.node.network = network
return network
}
func (n *network) Init(opts ...Option) error {
n.Lock()
defer n.Unlock()
// TODO: maybe only allow reinit of certain opts
for _, o := range opts {
o(&n.options)
}
return nil
}
// Options returns network options
func (n *network) Options() Options {
n.RLock()
defer n.RUnlock()
options := n.options
return options
}
// Name returns network name
func (n *network) Name() string {
n.RLock()
defer n.RUnlock()
name := n.options.Name
return name
}
// acceptNetConn accepts connections from NetworkChannel
func (n *network) acceptNetConn(l tunnel.Listener, recv chan *message) {
var i int
for {
// accept a connection
conn, err := l.Accept()
if err != nil {
sleep := backoff.Do(i)
logger.Debugf("Network tunnel [%s] accept error: %v, backing off for %v", ControlChannel, err, sleep)
time.Sleep(sleep)
i++
continue
}
select {
case <-n.closed:
if err := conn.Close(); err != nil {
logger.Debugf("Network tunnel [%s] failed to close connection: %v", NetworkChannel, err)
}
return
default:
// go handle NetworkChannel connection
go n.handleNetConn(conn, recv)
}
}
}
// acceptCtrlConn accepts connections from ControlChannel
func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *message) {
var i int
for {
// accept a connection
conn, err := l.Accept()
if err != nil {
sleep := backoff.Do(i)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network tunnel [%s] accept error: %v, backing off for %v", ControlChannel, err, sleep)
}
time.Sleep(sleep)
i++
continue
}
select {
case <-n.closed:
if err := conn.Close(); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network tunnel [%s] failed to close connection: %v", ControlChannel, err)
}
}
return
default:
// go handle ControlChannel connection
go n.handleCtrlConn(conn, recv)
}
}
}
// maskRoute will mask the route so that we apply the right values
func (n *network) maskRoute(r *pbRtr.Route) {
hasher := fnv.New64()
// the routes service address
address := r.Address
// only hash the address if we're advertising our own local routes
// avoid hashing * based routes
if r.Router == n.Id() && r.Address != "*" {
// hash the service before advertising it
hasher.Reset()
// routes for multiple instances of a service will be collapsed here.
// TODO: once we store labels in the table this may need to change
// to include the labels in case they differ but highly unlikely
hasher.Write([]byte(r.Service + n.Address()))
address = fmt.Sprintf("%d", hasher.Sum64())
}
// calculate route metric to advertise
metric := n.getRouteMetric(r.Router, r.Gateway, r.Link)
// NOTE: we override Gateway, Link and Address here
r.Address = address
r.Gateway = n.Address()
r.Link = DefaultLink
r.Metric = metric
}
// advertise advertises routes to the network
func (n *network) advertise(advertChan <-chan *router.Advert) {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
select {
// process local adverts and randomly fire them at other nodes
case advert := <-advertChan:
// create a proto advert
var events []*pbRtr.Event
for _, event := range advert.Events {
// make a copy of the route
route := &pbRtr.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Router: event.Route.Router,
Link: event.Route.Link,
Metric: event.Route.Metric,
}
// override the various values
n.maskRoute(route)
e := &pbRtr.Event{
Type: pbRtr.EventType(event.Type),
Timestamp: event.Timestamp.UnixNano(),
Route: route,
}
events = append(events, e)
}
msg := &pbRtr.Advert{
Id: advert.Id,
Type: pbRtr.AdvertType(advert.Type),
Timestamp: advert.Timestamp.UnixNano(),
Events: events,
}
// get a list of node peers
peers := n.Peers()
// continue if there is no one to send to
if len(peers) == 0 {
continue
}
// advertise to max 3 peers
max := len(peers)
if max > 3 {
max = 3
}
for i := 0; i < max; i++ {
if peer := n.node.GetPeerNode(peers[rnd.Intn(len(peers))].Id()); peer != nil {
if err := n.sendTo("advert", ControlChannel, peer, msg); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network failed to advertise routes to %s: %v", peer.Id(), err)
}
}
}
}
case <-n.closed:
return
}
}
}
// initNodes initializes tunnel with a list of resolved nodes
func (n *network) initNodes(startup bool) {
nodes, err := n.resolveNodes()
// NOTE: this condition never fires
// as resolveNodes() never returns error
if err != nil && !startup {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network failed to init nodes: %v", err)
}
return
}
// strip self
var init []string
// our current address
advertised := n.server.Options().Advertise
for _, node := range nodes {
// skip self
if node == advertised {
continue
}
// add the node
init = append(init, node)
}
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
// initialize the tunnel
logger.Tracef("Network initialising nodes %+v\n", init)
}
n.tunnel.Init(
tunnel.Nodes(nodes...),
)
}
// resolveNodes resolves network nodes to addresses
func (n *network) resolveNodes() ([]string, error) {
// resolve the network address to network nodes
records, err := n.options.Resolver.Resolve(n.options.Name)
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network failed to resolve nodes: %v", err)
}
}
// sort by lowest priority
if err == nil {
sort.Slice(records, func(i, j int) bool { return records[i].Priority < records[j].Priority })
}
// keep processing
nodeMap := make(map[string]bool)
// collect network node addresses
//nolint:prealloc
var nodes []string
var i int
for _, record := range records {
if _, ok := nodeMap[record.Address]; ok {
continue
}
nodeMap[record.Address] = true
nodes = append(nodes, record.Address)
i++
// break once MaxConnection nodes has been reached
if i == MaxConnections {
break
}
}
// use the DNS resolver to expand peers
dns := &dns.Resolver{}
// append seed nodes if we have them
for _, node := range n.options.Nodes {
// resolve anything that looks like a host name
records, err := dns.Resolve(node)
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Failed to resolve %v %v", node, err)
}
continue
}
// add to the node map
for _, record := range records {
if _, ok := nodeMap[record.Address]; !ok {
nodes = append(nodes, record.Address)
}
}
}
return nodes, nil
}
// handleNetConn handles network announcement messages
func (n *network) handleNetConn(s tunnel.Session, msg chan *message) {
for {
m := new(transport.Message)
if err := s.Recv(m); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err)
}
switch err {
case io.EOF, tunnel.ErrReadTimeout:
s.Close()
return
}
continue
}
// check if peer is set
peer := m.Header["Micro-Peer"]
// check who the message is intended for
if len(peer) > 0 && peer != n.options.Id {
continue
}
select {
case msg <- &message{
msg: m,
session: s,
}:
case <-n.closed:
return
}
}
}
// handleCtrlConn handles ControlChannel connections
func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) {
for {
m := new(transport.Message)
if err := s.Recv(m); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network tunnel [%s] receive error: %v", ControlChannel, err)
}
switch err {
case io.EOF, tunnel.ErrReadTimeout:
s.Close()
return
}
continue
}
// check if peer is set
peer := m.Header["Micro-Peer"]
// check who the message is intended for
if len(peer) > 0 && peer != n.options.Id {
continue
}
select {
case msg <- &message{
msg: m,
session: s,
}:
case <-n.closed:
return
}
}
}
// getHopCount queries network graph and returns hop count for given router
// NOTE: this should be called getHopeMetric
// - Routes for local services have hop count 1
// - Routes with ID of adjacent nodes have hop count 10
// - Routes by peers of the advertiser have hop count 100
// - Routes beyond node neighbourhood have hop count 1000
func (n *network) getHopCount(rtr string) int {
// make sure node.peers are not modified
n.node.RLock()
defer n.node.RUnlock()
// we are the origin of the route
if rtr == n.options.Id {
return 1
}
// the route origin is our peer
if _, ok := n.node.peers[rtr]; ok {
return 10
}
// the route origin is the peer of our peer
for _, peer := range n.node.peers {
for id := range peer.peers {
if rtr == id {
return 100
}
}
}
// otherwise we are three hops away
return 1000
}
// getRouteMetric calculates router metric and returns it
// Route metric is calculated based on link status and route hopd count
func (n *network) getRouteMetric(router string, gateway string, link string) int64 {
// set the route metric
n.RLock()
defer n.RUnlock()
// local links are marked as 1
if link == "local" && gateway == "" {
return 1
}
// local links from other gateways as 2
if link == "local" && gateway != "" {
return 2
}
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Network looking up %s link to gateway: %s", link, gateway)
}
// attempt to find link based on gateway address
lnk, ok := n.peerLinks[gateway]
if !ok {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network failed to find a link to gateway: %s", gateway)
}
// no link found so infinite metric returned
return math.MaxInt64
}
// calculating metric
delay := lnk.Delay()
hops := n.getHopCount(router)
length := lnk.Length()
// make sure delay is non-zero
if delay == 0 {
delay = 1
}
// make sure length is non-zero
if length == 0 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Link length is 0 %v %v", link, lnk.Length())
}
length = 10e9
}
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Network calculated metric %v delay %v length %v distance %v", (delay*length*int64(hops))/10e6, delay, length, hops)
}
return (delay * length * int64(hops)) / 10e6
}
// processCtrlChan processes messages received on ControlChannel
func (n *network) processCtrlChan(listener tunnel.Listener) {
defer listener.Close()
// receive control message queue
recv := make(chan *message, 128)
// accept ControlChannel cconnections
go n.acceptCtrlConn(listener, recv)
for {
select {
case m := <-recv:
// switch on type of message and take action
switch m.msg.Header["Micro-Method"] {
case "advert":
pbRtrAdvert := &pbRtr.Advert{}
if err := proto.Unmarshal(m.msg.Body, pbRtrAdvert); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network fail to unmarshal advert message: %v", err)
}
continue
}
// don't process your own messages
if pbRtrAdvert.Id == n.options.Id {
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network received advert message from: %s", pbRtrAdvert.Id)
}
// loookup advertising node in our peer topology
advertNode := n.node.GetPeerNode(pbRtrAdvert.Id)
if advertNode == nil {
// if we can't find the node in our topology (MaxDepth) we skipp prcessing adverts
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network skipping advert message from unknown peer: %s", pbRtrAdvert.Id)
}
continue
}
var events []*router.Event
for _, event := range pbRtrAdvert.Events {
// for backwards compatibility reasons
if event == nil || event.Route == nil {
continue
}
// we know the advertising node is not the origin of the route
if pbRtrAdvert.Id != event.Route.Router {
// if the origin router is not the advertising node peer
// we can't rule out potential routing loops so we bail here
if peer := advertNode.GetPeerNode(event.Route.Router); peer == nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network skipping advert message from peer: %s", pbRtrAdvert.Id)
}
continue
}
}
route := router.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Router: event.Route.Router,
Link: event.Route.Link,
Metric: event.Route.Metric,
}
// calculate route metric and add to the advertised metric
// we need to make sure we do not overflow math.MaxInt64
metric := n.getRouteMetric(event.Route.Router, event.Route.Gateway, event.Route.Link)
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Network metric for router %s and gateway %s: %v", event.Route.Router, event.Route.Gateway, metric)
}
// check we don't overflow max int 64
if d := route.Metric + metric; d <= 0 {
// set to max int64 if we overflow
route.Metric = math.MaxInt64
} else {
// set the combined value of metrics otherwise
route.Metric = d
}
// create router event
e := &router.Event{
Type: router.EventType(event.Type),
Timestamp: time.Unix(0, pbRtrAdvert.Timestamp),
Route: route,
}
events = append(events, e)
}
// if no events are eligible for processing continue
if len(events) == 0 {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Network no events to be processed by router: %s", n.options.Id)
}
continue
}
// create an advert and process it
advert := &router.Advert{
Id: pbRtrAdvert.Id,
Type: router.AdvertType(pbRtrAdvert.Type),
Timestamp: time.Unix(0, pbRtrAdvert.Timestamp),
TTL: time.Duration(pbRtrAdvert.Ttl),
Events: events,
}
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Network router %s processing advert: %s", n.Id(), advert.Id)
}
if err := n.router.Process(advert); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network failed to process advert %s: %v", advert.Id, err)
}
}
}
case <-n.closed:
return
}
}
}
// processNetChan processes messages received on NetworkChannel
func (n *network) processNetChan(listener tunnel.Listener) {
defer listener.Close()
// receive network message queue
recv := make(chan *message, 128)
// accept NetworkChannel connections
go n.acceptNetConn(listener, recv)
for {
select {
case m := <-recv:
// switch on type of message and take action
switch m.msg.Header["Micro-Method"] {
case "connect":
// mark the time the message has been received
now := time.Now()
pbNetConnect := &pbNet.Connect{}
if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil {
logger.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err)
continue
}
// don't process your own messages
if pbNetConnect.Node.Id == n.options.Id {
continue
}
logger.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id)
peer := &node{
id: pbNetConnect.Node.Id,
address: pbNetConnect.Node.Address,
link: m.msg.Header["Micro-Link"],
peers: make(map[string]*node),
status: newStatus(),
lastSeen: now,
}
// update peer links
// TODO: should we do this only if we manage to add a peer
// What should we do if the peer links failed to be updated?
if err := n.updatePeerLinks(peer); err != nil {
logger.Debugf("Network failed updating peer links: %s", err)
}
// add peer to the list of node peers
if err := n.AddPeer(peer); err == ErrPeerExists {
logger.Tracef("Network peer exists, refreshing: %s", peer.id)
// update lastSeen time for the peer
if err := n.RefreshPeer(peer.id, peer.link, now); err != nil {
logger.Debugf("Network failed refreshing peer %s: %v", peer.id, err)
}
}
// we send the sync message because someone has sent connect
// and wants to either connect or reconnect to the network
// The faster it gets the network config (routes and peer graph)
// the faster the network converges to a stable state
go func() {
// get node peer graph to send back to the connecting node
node := PeersToProto(n.node, MaxDepth)
msg := &pbNet.Sync{
Peer: node,
}
// get a list of the best routes for each service in our routing table
routes, err := n.getProtoRoutes()
if err != nil {
logger.Debugf("Network node %s failed listing routes: %v", n.id, err)
}
// attached the routes to the message
msg.Routes = routes
// send sync message to the newly connected peer
if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil {
logger.Debugf("Network failed to send sync message: %v", err)
}
}()
case "peer":
// mark the time the message has been received
now := time.Now()
pbNetPeer := &pbNet.Peer{}
if err := proto.Unmarshal(m.msg.Body, pbNetPeer); err != nil {
logger.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err)
continue
}
// don't process your own messages
if pbNetPeer.Node.Id == n.options.Id {
continue
}
logger.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address)
peer := &node{
id: pbNetPeer.Node.Id,
address: pbNetPeer.Node.Address,
link: m.msg.Header["Micro-Link"],
peers: make(map[string]*node),
status: newPeerStatus(pbNetPeer),
lastSeen: now,
}
// update peer links
// TODO: should we do this only if we manage to add a peer
// What should we do if the peer links failed to be updated?
if err := n.updatePeerLinks(peer); err != nil {
logger.Debugf("Network failed updating peer links: %s", err)
}
// if it's a new peer i.e. we do not have it in our graph, we request full sync
if err := n.node.AddPeer(peer); err == nil {
go func() {
// marshal node graph into protobuf
node := PeersToProto(n.node, MaxDepth)
msg := &pbNet.Sync{
Peer: node,
}
// get a list of the best routes for each service in our routing table
routes, err := n.getProtoRoutes()
if err != nil {
logger.Debugf("Network node %s failed listing routes: %v", n.id, err)
}
// attached the routes to the message
msg.Routes = routes
// send sync message to the newly connected peer
if err := n.sendTo("sync", NetworkChannel, peer, msg); err != nil {
logger.Debugf("Network failed to send sync message: %v", err)
}
}()
continue
// if we already have the peer in our graph, skip further steps
} else if err != ErrPeerExists {
logger.Debugf("Network got error adding peer %v", err)
continue
}
logger.Tracef("Network peer exists, refreshing: %s", pbNetPeer.Node.Id)
// update lastSeen time for the peer
if err := n.RefreshPeer(peer.id, peer.link, now); err != nil {
logger.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err)
}
// NOTE: we don't unpack MaxDepth toplogy
peer = UnpackPeerTopology(pbNetPeer, now, MaxDepth-1)
// update the link
peer.link = m.msg.Header["Micro-Link"]
logger.Tracef("Network updating topology of node: %s", n.node.id)
if err := n.node.UpdatePeer(peer); err != nil {
logger.Debugf("Network failed to update peers: %v", err)
}
// tell the connect loop that we've been discovered
// so it stops sending connect messages out
select {
case n.discovered <- true:
default:
// don't block here
}
case "sync":
// record the timestamp of the message receipt
now := time.Now()
pbNetSync := &pbNet.Sync{}
if err := proto.Unmarshal(m.msg.Body, pbNetSync); err != nil {
logger.Debugf("Network tunnel [%s] sync unmarshal error: %v", NetworkChannel, err)
continue
}
// don't process your own messages
if pbNetSync.Peer.Node.Id == n.options.Id {
continue
}
logger.Debugf("Network received sync message from: %s", pbNetSync.Peer.Node.Id)
peer := &node{
id: pbNetSync.Peer.Node.Id,
address: pbNetSync.Peer.Node.Address,
link: m.msg.Header["Micro-Link"],
peers: make(map[string]*node),
status: newPeerStatus(pbNetSync.Peer),
lastSeen: now,
}
// update peer links
// TODO: should we do this only if we manage to add a peer
// What should we do if the peer links failed to be updated?
if err := n.updatePeerLinks(peer); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network failed updating peer links: %s", err)
}
}
// add peer to the list of node peers
if err := n.node.AddPeer(peer); err == ErrPeerExists {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("Network peer exists, refreshing: %s", peer.id)
}
// update lastSeen time for the existing node
if err := n.RefreshPeer(peer.id, peer.link, now); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network failed refreshing peer %s: %v", peer.id, err)
}
}
}
// when we receive a sync message we update our routing table
// and send a peer message back to the network to announce our presence
// add all the routes we have received in the sync message
for _, pbRoute := range pbNetSync.Routes {
// unmarshal the routes received from remote peer
route := pbUtil.ProtoToRoute(pbRoute)
// continue if we are the originator of the route
if route.Router == n.router.Options().Id {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network node %s skipping route addition: route already present", n.id)
}
continue
}