forked from go-mysql-org/go-mysql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync.go
133 lines (109 loc) · 2.93 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package canal
import (
"time"
"golang.org/x/net/context"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
)
func (c *Canal) startSyncBinlog() error {
pos := mysql.Position{c.master.Name, c.master.Position}
log.Infof("start sync binlog at %v", pos)
s, err := c.syncer.StartSync(pos)
if err != nil {
return errors.Errorf("start sync replication at %v error %v", pos, err)
}
timeout := time.Second
forceSavePos := false
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ev, err := s.GetEvent(ctx)
cancel()
if err == context.DeadlineExceeded {
timeout = 2 * timeout
continue
}
if err != nil {
return errors.Trace(err)
}
timeout = time.Second
//next binlog pos
pos.Pos = ev.Header.LogPos
forceSavePos = false
switch e := ev.Event.(type) {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
// r.ev <- pos
forceSavePos = true
log.Infof("rotate binlog to %v", pos)
case *replication.RowsEvent:
// we only focus row based event
if err = c.handleRowsEvent(ev); err != nil {
log.Errorf("handle rows event error %v", err)
return errors.Trace(err)
}
case *replication.TableMapEvent:
continue
case *replication.FormatDescriptionEvent:
continue
default:
}
c.master.Update(pos.Name, pos.Pos)
c.master.Save(forceSavePos)
}
return nil
}
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
ev := e.Event.(*replication.RowsEvent)
// Caveat: table may be altered at runtime.
schema := string(ev.Table.Schema)
table := string(ev.Table.Table)
t, err := c.GetTable(schema, table)
if err != nil {
return errors.Trace(err)
}
var action string
switch e.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
action = InsertAction
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
action = DeleteAction
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
action = UpdateAction
default:
return errors.Errorf("%s not supported now", e.Header.EventType)
}
events := newRowsEvent(t, action, ev.Rows)
return c.travelRowsEventHandler(events)
}
func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error {
if timeout <= 0 {
timeout = 60
}
timer := time.NewTimer(time.Duration(timeout) * time.Second)
for {
select {
case <-timer.C:
return errors.Errorf("wait position %v err", pos)
default:
curpos := c.master.Pos()
if curpos.Compare(pos) >= 0 {
return nil
} else {
time.Sleep(100 * time.Millisecond)
}
}
}
return nil
}
func (c *Canal) CatchMasterPos(timeout int) error {
rr, err := c.Execute("SHOW MASTER STATUS")
if err != nil {
return errors.Trace(err)
}
name, _ := rr.GetString(0, 0)
pos, _ := rr.GetInt(0, 1)
return c.WaitUntilPos(mysql.Position{name, uint32(pos)}, timeout)
}