-
Notifications
You must be signed in to change notification settings - Fork 0
/
kfk_producter.go
148 lines (130 loc) · 3.83 KB
/
kfk_producter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package kfktool
import (
"github.com/Shopify/sarama"
)
// ConfProducter ...
type ConfProducter []MyProducter
// MyProducter ...
type MyProducter struct {
Alias string `yaml:"alias"`
Brokers []string `yaml:"brokers"`
Sync bool `yaml:"sync"`
WaitAck sarama.RequiredAcks `yaml:"wait_ack"`
SuccLog bool `yaml:"succ_log"`
Debug bool `yaml:"debug"`
SyncProducter sarama.SyncProducer
AsyncProducer sarama.AsyncProducer
log *Logger
}
// Dial ...
func (p *MyProducter) Dial() error {
// p.Lock.Lock()
// defer p.Lock.Unlock()
p.Close()
config := sarama.NewConfig()
// 等待ACK的机制
config.Producer.RequiredAcks = p.WaitAck
config.Producer.Partitioner = sarama.NewHashPartitioner
// 错误通道必须开启使用
config.Producer.Return.Errors = true
client, err := sarama.NewClient(p.Brokers, config)
if err != nil {
return err
}
if p.Sync {
// 同步方式必须要有return.successes = true
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
p.log = NewLogger("", p.Alias, true, "only_console")
p.log.debug = p.Debug
p.SyncProducter = producer
} else {
// 异步的方式自己决定是否使用return.success
config.Producer.Return.Successes = p.SuccLog
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return err
}
p.AsyncProducer = producer
// 默认日志接收器
p.log = NewLogger("", p.Alias, true, "only_console")
p.log.debug = p.Debug
// 如果创建完成异步发送器后 需要对日志进行收集
go func(mp *MyProducter) {
for {
select {
case succMsg := <-p.AsyncProducer.Successes():
p.log.PInfo("publish succ at %d partition %d offset text %s", succMsg.Partition, succMsg.Offset, succMsg.Value)
case errMsg := <-p.AsyncProducer.Errors():
p.log.PErr("publish failed at %d partition %d offset text %s reason %s", errMsg.Msg.Partition, errMsg.Msg.Offset, errMsg.Msg.Value, errMsg.Err.Error())
}
}
}(p)
}
return nil
}
// Close ...
func (p *MyProducter) Close() {
if p.SyncProducter != nil {
p.SyncProducter.Close()
p.SyncProducter = nil
}
if p.AsyncProducer != nil {
p.AsyncProducer.Close()
p.AsyncProducer = nil
}
}
// Publish ... 发消息
func (p *MyProducter) Publish(topic, value string, key ...string) (partition int32, offset int64, err error) {
if p.Sync {
partition, offset, err = p.syncPublish(topic, value, key)
} else {
p.asyncPublish(topic, value, key)
}
return
}
func (p *MyProducter) syncPublish(topic, value string, key []string) (partition int32, offset int64, err error) {
if len(key) > 0 {
partition, offset, err = p.SyncProducter.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key[0]),
Value: sarama.StringEncoder(value),
})
} else {
partition, offset, err = p.SyncProducter.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(value),
})
}
if err != nil {
// 记错误日志
p.log.PErr("publish failed at %d partition %d offset text %s reason %s", partition, offset, value, err.Error())
return
}
// 如果要打成功日志
if p.SuccLog {
p.log.PInfo("publish succ at %d partition %d offset text %s", partition, offset, value)
}
return
}
func (p *MyProducter) asyncPublish(topic, value string, key []string) {
if len(key) > 0 {
p.AsyncProducer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key[0]),
Value: sarama.StringEncoder(value),
}
} else {
p.AsyncProducer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(value),
}
}
}
// SetLogger ... 使用自定的日志器
func (p *MyProducter) SetLogger(logger *Logger) {
p.log = logger
}