Skip to content

Commit

Permalink
Updating kafka driver to use the new client connection helper.
Browse files Browse the repository at this point in the history
Added defaults for topics.

Signed-off-by: Vishnu kannan <vishnuk@google.com>
  • Loading branch information
vishh committed Nov 3, 2015
1 parent 0eaade5 commit 6725da6
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 47 deletions.
6 changes: 3 additions & 3 deletions docs/sink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ Normally, kafka server has multi brokers, so brokers' list need be configured fo
So, we can set `KAFKA_SERVER_URL` to a dummy value, and provide kafka brokers' list in url's query string.
Besides,the following options need be set in query string:

* `timeseriestopic` - Kafka's topic for timeseries
* `eventstopic` - Kafka's topic for events
* `timeseriestopic` - Kafka's topic for timeseries. Default: `heapster-metrics`
* `eventstopic` - Kafka's topic for events. Default: `heapster-events`

Like this:

--sink=kafka:http://kafka/?brokers=0.0.0.0:9092&brokers=0.0.0.0:9093&timeseriestopic=test&eventstopic=test
--sink="kafka:?brokers=0.0.0.0:9092&brokers=0.0.0.0:9093"

## Modifying the sinks at runtime

Expand Down
97 changes: 59 additions & 38 deletions sinks/kafka/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/optiopay/kafka/proto"
"k8s.io/heapster/extpoints"
sink_api "k8s.io/heapster/sinks/api"
sinkutil "k8s.io/heapster/sinks/util"
kube_api "k8s.io/kubernetes/pkg/api"
)

Expand All @@ -37,23 +38,27 @@ const (
brokerAllowTopicCreation = true
brokerLeaderRetryLimit = 1
brokerLeaderRetryWait = 0
timeSeriesTopic = "heapster-metrics"
eventsTopic = "heapster-events"
)

type kafkaSink struct {
brokerConf kafka.BrokerConf
producer kafka.Producer
timeSeriesTopic string
eventsTopic string
sinkBrokerHosts []string
ci sinkutil.ClientInitializer
}

type kafkaSinkPoint struct {
type KafkaSinkPoint struct {
MetricsName string
MetricsValue interface{}
MetricsTimestamp time.Time
MetricsTags map[string]string
}

type kafkaSinkEvent struct {
type KafkaSinkEvent struct {
EventMessage string
EventReason string
EventTimestamp time.Time
Expand All @@ -64,16 +69,16 @@ type kafkaSinkEvent struct {

// START: ExternalSink interface implementations

func (self *kafkaSink) Register(mds []sink_api.MetricDescriptor) error {
func (ks *kafkaSink) Register(mds []sink_api.MetricDescriptor) error {
return nil
}

func (self *kafkaSink) Unregister(mds []sink_api.MetricDescriptor) error {
func (ks *kafkaSink) Unregister(mds []sink_api.MetricDescriptor) error {
return nil
}

func (self *kafkaSink) StoreTimeseries(timeseries []sink_api.Timeseries) error {
if timeseries == nil || len(timeseries) <= 0 {
func (ks *kafkaSink) StoreTimeseries(timeseries []sink_api.Timeseries) error {
if !ks.ci.Done() || timeseries == nil || len(timeseries) <= 0 {
return nil
}
for _, t := range timeseries {
Expand All @@ -84,7 +89,7 @@ func (self *kafkaSink) StoreTimeseries(timeseries []sink_api.Timeseries) error {
if t.MetricDescriptor.Type.String() != "" {
seriesName = fmt.Sprintf("%s_%s", seriesName, t.MetricDescriptor.Type.String())
}
sinkPoint := kafkaSinkPoint{
sinkPoint := KafkaSinkPoint{
MetricsName: seriesName,
MetricsValue: t.Point.Value,
MetricsTimestamp: t.Point.End.UTC(),
Expand All @@ -95,20 +100,20 @@ func (self *kafkaSink) StoreTimeseries(timeseries []sink_api.Timeseries) error {
sinkPoint.MetricsTags[key] = value
}
}
err := self.produceKafkaMessage(sinkPoint, self.timeSeriesTopic)
err := ks.produceKafkaMessage(sinkPoint, ks.timeSeriesTopic)
if err != nil {
return fmt.Errorf("failed to produce Kafka messages: %s", err)
}
}
return nil
}

func (self *kafkaSink) StoreEvents(events []kube_api.Event) error {
if events == nil || len(events) <= 0 {
func (ks *kafkaSink) StoreEvents(events []kube_api.Event) error {
if !ks.ci.Done() || events == nil || len(events) <= 0 {
return nil
}
for _, event := range events {
sinkEvent := kafkaSinkEvent{
sinkEvent := KafkaSinkEvent{
EventMessage: event.Message,
EventReason: event.Reason,
EventTimestamp: event.LastTimestamp.UTC(),
Expand All @@ -117,7 +122,7 @@ func (self *kafkaSink) StoreEvents(events []kube_api.Event) error {
EventSource: event.Source,
}

err := self.produceKafkaMessage(sinkEvent, self.eventsTopic)
err := ks.produceKafkaMessage(sinkEvent, ks.eventsTopic)
if err != nil {
return fmt.Errorf("failed to produce Kafka messages: %s", err)
}
Expand All @@ -126,7 +131,7 @@ func (self *kafkaSink) StoreEvents(events []kube_api.Event) error {
}

// produceKafkaMessage produces messages to kafka
func (self *kafkaSink) produceKafkaMessage(v interface{}, topic string) error {
func (ks *kafkaSink) produceKafkaMessage(v interface{}, topic string) error {
if v == nil {
return nil
}
Expand All @@ -135,24 +140,48 @@ func (self *kafkaSink) produceKafkaMessage(v interface{}, topic string) error {
return fmt.Errorf("failed to transform the items to json : %s", err)
}
message := &proto.Message{Value: []byte(string(jsonItems))}
_, err = self.producer.Produce(topic, partition, message)
_, err = ks.producer.Produce(topic, partition, message)
if err != nil {
return fmt.Errorf("failed to produce message to %s:%d: %s", topic, partition, err)
}
return nil
}

func (self *kafkaSink) DebugInfo() string {
info := fmt.Sprintf("%s\n", self.Name())
info += fmt.Sprintf("There are two kafka's topics: %s,%s:\n", self.eventsTopic, self.timeSeriesTopic)
info += fmt.Sprintf("The kafka's broker list is: %s", self.sinkBrokerHosts)
func (ks *kafkaSink) DebugInfo() string {
info := fmt.Sprintf("%s\n", ks.Name())
info += fmt.Sprintf("There are two kafka topics: %s,%s:\n", ks.eventsTopic, ks.timeSeriesTopic)
info += fmt.Sprintf("Kafka broker list is: %s", ks.sinkBrokerHosts)
if !ks.ci.Done() {
info += fmt.Sprintf("Kafka client has not been initialized yet.")
}
return info
}

func (self *kafkaSink) Name() string {
func (ks *kafkaSink) Name() string {
return "Apache-Kafka Sink"
}

func (ks *kafkaSink) ping() error {
_, err := kafka.Dial(ks.sinkBrokerHosts, ks.brokerConf)
return err
}

func (ks *kafkaSink) setupClient() error {
glog.V(3).Infof("attempting to setup kafka sink")
broker, err := kafka.Dial(ks.sinkBrokerHosts, ks.brokerConf)
if err != nil {
return fmt.Errorf("failed to connect to kafka cluster: %s", err)
}
defer broker.Close()
//create kafka producer
conf := kafka.NewProducerConf()
conf.RequiredAcks = proto.RequiredAcksLocal
sinkProducer := broker.Producer(conf)
ks.producer = sinkProducer
glog.V(3).Infof("kafka sink setup successfully")
return nil
}

func init() {
extpoints.SinkFactories.Register(NewKafkaSink, "kafka")
}
Expand All @@ -164,21 +193,22 @@ func NewKafkaSink(uri *url.URL, _ extpoints.HeapsterConf) ([]sink_api.ExternalSi
return nil, fmt.Errorf("failed to parser url's query string: %s", err)
}

if len(opts["timeseriestopic"]) < 1 {
return nil, fmt.Errorf("There is no timeseriestopic assign for config kafka-sink")
kafkaSink.timeSeriesTopic = timeSeriesTopic
if len(opts["timeseriestopic"]) > 0 {
kafkaSink.timeSeriesTopic = opts["timeseriestopic"][0]
}
kafkaSink.timeSeriesTopic = opts["timeseriestopic"][0]

if len(opts["eventstopic"]) < 1 {
return nil, fmt.Errorf("There is no eventstopic assign for config kafka-sink")
kafkaSink.eventsTopic = eventsTopic
if len(opts["eventstopic"]) > 0 {
kafkaSink.eventsTopic = opts["eventstopic"][0]
}
kafkaSink.eventsTopic = opts["eventstopic"][0]

if len(opts["brokers"]) < 1 {
return nil, fmt.Errorf("There is no broker assign for connecting kafka broker")
return nil, fmt.Errorf("There is no broker assigned for connecting kafka broker")
}
kafkaSink.sinkBrokerHosts = append(kafkaSink.sinkBrokerHosts, opts["brokers"]...)

glog.V(2).Infof("initializing kafka sink with brokers - %v", kafkaSink.sinkBrokerHosts)
//connect to kafka cluster
brokerConf := kafka.NewBrokerConf(brokerClientID)
brokerConf.DialTimeout = brokerDialTimeout
Expand All @@ -188,17 +218,8 @@ func NewKafkaSink(uri *url.URL, _ extpoints.HeapsterConf) ([]sink_api.ExternalSi
brokerConf.LeaderRetryWait = brokerLeaderRetryWait
brokerConf.AllowTopicCreation = true

broker, err := kafka.Dial(kafkaSink.sinkBrokerHosts, brokerConf)
if err != nil {
return nil, fmt.Errorf("failed to connect to kafka cluster: %s", err)
}
defer broker.Close()

//create kafka producer
conf := kafka.NewProducerConf()
conf.RequiredAcks = proto.RequiredAcksLocal
sinkProducer := broker.Producer(conf)
kafkaSink.producer = sinkProducer
glog.Infof("created kafka sink successfully with brokers: %v", kafkaSink.sinkBrokerHosts)
// Store broker configuration.
kafkaSink.brokerConf = brokerConf
kafkaSink.ci = sinkutil.NewClientInitializer("kafka", kafkaSink.setupClient, kafkaSink.ping, 10*time.Second)
return []sink_api.ExternalSink{&kafkaSink}, nil
}
15 changes: 9 additions & 6 deletions sinks/kafka/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"time"

"fmt"

"github.com/optiopay/kafka/proto"
"github.com/stretchr/testify/assert"
sink_api "k8s.io/heapster/sinks/api"
sinkutil "k8s.io/heapster/sinks/util"
kube_api "k8s.io/kubernetes/pkg/api"
kube_api_unv "k8s.io/kubernetes/pkg/api/unversioned"
)
Expand Down Expand Up @@ -59,10 +61,11 @@ func NewFakeSink() fakeKafkaSink {
fakesinkBrokerHosts := make([]string, 2)
return fakeKafkaSink{
&kafkaSink{
producer,
fakeTimeSeriesTopic,
fakeEventsTopic,
fakesinkBrokerHosts,
producer: producer,
timeSeriesTopic: fakeTimeSeriesTopic,
eventsTopic: fakeEventsTopic,
sinkBrokerHosts: fakesinkBrokerHosts,
ci: sinkutil.NewClientInitializer("test", func() error { return nil }, func() error { return nil }, time.Millisecond),
},
producer,
}
Expand Down Expand Up @@ -180,7 +183,7 @@ func TestStoreTimeseriesSingleTimeserieInput(t *testing.T) {

assert.Equal(t, 1, len(fakeSink.fakeProducer.msgs))

timeStr, err := timeNow.MarshalJSON()
timeStr, err := timeNow.UTC().MarshalJSON()
assert.NoError(t, err)

msgString := fmt.Sprintf(`{"MetricsName":"test/metric/1_cumulative","MetricsValue":123456,"MetricsTimestamp":%s,"MetricsTags":{"container_name":"docker","hostname":"localhost","pod_id":"aaaa-bbbb-cccc-dddd","test":"notvisible"}}`, timeStr)
Expand Down Expand Up @@ -235,7 +238,7 @@ func TestStoreTimeseriesMultipleTimeseriesInput(t *testing.T) {

assert.Equal(t, 2, len(fakeSink.fakeProducer.msgs))

timeStr, err := timeNow.MarshalJSON()
timeStr, err := timeNow.UTC().MarshalJSON()
assert.NoError(t, err)

msgString1 := fmt.Sprintf(`{"MetricsName":"test/metric/1_cumulative","MetricsValue":123456,"MetricsTimestamp":%s,"MetricsTags":{"container_name":"docker","hostname":"localhost","pod_id":"aaaa-bbbb-cccc-dddd","test":"notvisible"}}`, timeStr)
Expand Down

0 comments on commit 6725da6

Please sign in to comment.