Skip to content

Commit

Permalink
Merge pull request kubernetes-retired#558 from mwielgus/model_api_test
Browse files Browse the repository at this point in the history
Model api integration test + more debugs
  • Loading branch information
vishh committed Sep 15, 2015
2 parents 44b0119 + 8461efa commit 655e135
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 9 deletions.
14 changes: 11 additions & 3 deletions integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ type kubeFramework interface {
// Returns the node hostnames.
GetNodes() ([]string, error)

// Returns pod names in the cluster.
// TODO: Remove, or mix with namespace
GetPodNames() ([]string, error)

// Returns pods in the cluster.
GetPods() ([]string, error)
GetPodList() (*api.PodList, error)

WaitUntilPodRunning(ns string, podLabels map[string]string, timeout time.Duration) error
WaitUntilServiceActive(svc *api.Service, timeout time.Duration) error
Expand Down Expand Up @@ -424,9 +428,13 @@ func (self *realKubeFramework) GetNodes() ([]string, error) {
return nodes, nil
}

func (self *realKubeFramework) GetPods() ([]string, error) {
func (self *realKubeFramework) GetPodList() (*api.PodList, error) {
return self.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
}

func (self *realKubeFramework) GetPodNames() ([]string, error) {
var pods []string
podList, err := self.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
podList, err := self.GetPodList()
if err != nil {
return pods, err
}
Expand Down
114 changes: 108 additions & 6 deletions integration/heapster_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,34 +52,50 @@ var (
)

func deleteAll(fm kubeFramework, ns string, service *kube_api.Service, rc *kube_api.ReplicationController) error {
glog.V(2).Infof("Deleting rc %s/%s...", ns, rc.Name)
if err := fm.DeleteRC(ns, rc); err != nil {
glog.V(2).Infof("Failed to delete rc: %v", err)
return err
}
glog.V(2).Infof("Deleted rc %s/%s.", ns, rc.Name)

glog.V(2).Infof("Deleting service %s/%s.", ns, service.Name)
if err := fm.DeleteService(ns, service); err != nil {
glog.V(2).Infof("Failed to delete service: %v", err)
return err
}
glog.V(2).Infof("Deleted service %s/%s.", ns, service.Name)
return nil
}

func createAll(fm kubeFramework, ns string, service **kube_api.Service, rc **kube_api.ReplicationController) error {
glog.V(2).Infof("Creating rc %s/%s...", ns, (*rc).Name)
if newRc, err := fm.CreateRC(ns, *rc); err != nil {
glog.V(2).Infof("Failed to create rc: %v", err)
return err
} else {
*rc = newRc
}
glog.V(2).Infof("Created rc %s/%s.", ns, (*rc).Name)

glog.V(2).Infof("Creating service %s/%s...", ns, (*service).Name)
if newSvc, err := fm.CreateService(ns, *service); err != nil {
glog.V(2).Infof("Failed to create service: %v", err)
return err
} else {
*service = newSvc
}
glog.V(2).Infof("Created servuce %s/%s.", ns, (*service).Name)

return nil
}

func removeHeapsterImage(fm kubeFramework) error {
glog.V(2).Infof("Removing heapster image.")
if err := removeDockerImage(*heapsterImage); err != nil {
glog.Error(err)
glog.Errorf("Failed to remove Heapster image: %v", err)
} else {
glog.V(2).Infof("Heapster image removed.")
}
if nodes, err := fm.GetNodes(); err == nil {
for _, node := range nodes {
Expand All @@ -93,6 +109,7 @@ func removeHeapsterImage(fm kubeFramework) error {
}

func buildAndPushHeapsterImage(hostnames []string) error {
glog.V(2).Info("Building and pushing Heapster image...")
curwd, err := os.Getwd()
if err != nil {
return err
Expand All @@ -108,7 +125,7 @@ func buildAndPushHeapsterImage(hostnames []string) error {
return err
}
}
glog.V(2).Info("built and pushed heapster image")
glog.V(2).Info("Heapster image pushed.")
return os.Chdir(curwd)
}

Expand Down Expand Up @@ -190,6 +207,31 @@ func getSchema(fm kubeFramework, svc *kube_api.Service) (*api_v1.TimeseriesSchem
return &timeseriesSchema, nil
}

func getModelMetrics(fm kubeFramework, svc *kube_api.Service, pod *kube_api.Pod) (*api_v1.MetricResultList, error) {
url := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s",
pod.Namespace,
pod.Name,
"cpu-usage")

body, err := fm.Client().Get().
Namespace(svc.Namespace).
Prefix("proxy").
Resource("services").
Name(svc.Name).
Suffix(url).
Do().Raw()
if err != nil {
return nil, err
}
var metrics api_v1.MetricResultList
if err := json.Unmarshal(body, &metrics); err != nil {
glog.V(2).Infof("response body: %v", string(body))
return nil, err
}
return &metrics, nil

}

var expectedSystemContainers = map[string]struct{}{
"machine": {},
"kubelet": {},
Expand Down Expand Up @@ -377,6 +419,38 @@ func runSinksTest(fm kubeFramework, svc *kube_api.Service) error {
return nil
}

func runModelTest(fm kubeFramework, svc *kube_api.Service) error {
podList, err := fm.GetPodList()
if err != nil {
return err
}
if len(podList.Items) == 0 {
return fmt.Errorf("Empty pod list")
}
for _, pod := range podList.Items {
metrics, err := getModelMetrics(fm, svc, &pod)
if err != nil {
return fmt.Errorf("error while getting metrics for %s/%s: %v", pod.Namespace, pod.Name, err)
}
if len(metrics.Items) == 0 {
return fmt.Errorf("empty metrics for: %s/%s", pod.Namespace, pod.Name)
}
if len(metrics.Items[0].Metrics) == 0 {
return fmt.Errorf("empty metrics for: %s/%s", pod.Namespace, pod.Name)
}
if time.Now().Sub(metrics.Items[0].LatestTimestamp).Seconds() > 30 {
return fmt.Errorf("Corrupted last timestamp for: %s/%s", pod.Namespace, pod.Name)
}
if time.Now().Sub(metrics.Items[0].Metrics[0].Timestamp).Seconds() > 30 {
return fmt.Errorf("Corrupted timestamp for: %s/%s", pod.Namespace, pod.Name)
}
if metrics.Items[0].Metrics[0].Value > 10000 {
return fmt.Errorf("Value too big for: %s/%s", pod.Namespace, pod.Name)
}
}
return nil
}

func apiTest(kubeVersion string) error {
fm, err := newKubeFramework(kubeVersion)
if err != nil {
Expand All @@ -403,7 +477,7 @@ func apiTest(kubeVersion string) error {
if err := fm.WaitUntilServiceActive(svc, time.Minute); err != nil {
return err
}
expectedPods, err := fm.GetPods()
expectedPods, err := fm.GetPodNames()
if err != nil {
return err
}
Expand All @@ -413,13 +487,38 @@ func apiTest(kubeVersion string) error {
}
testFuncs := []func() error{
func() error {
return runHeapsterMetricsTest(fm, svc, expectedNodes, expectedPods)
glog.V(2).Infof("Heapster metrics test...")
err := runHeapsterMetricsTest(fm, svc, expectedNodes, expectedPods)
if err == nil {
glog.V(2).Infof("Heapster metrics test: OK")
} else {
glog.V(2).Infof("Heapster metrics test error: %v", err)
}
return err
},
func() error {
return runSinksTest(fm, svc)
glog.V(2).Infof("Sinks test...")
err := runSinksTest(fm, svc)
if err == nil {
glog.V(2).Infof("Sinks test: OK")
} else {
glog.V(2).Infof("Sinks test error: %v", err)
}
return err
},
func() error {
glog.V(2).Infof("Model test")
err := runModelTest(fm, svc)
if err == nil {
glog.V(2).Infof("Model test: OK")
} else {
glog.V(2).Infof("Model test error: %v", err)
}
return err
},
}
attempts := *maxRetries
glog.Infof("Starting tests")
for {
var err error
for _, testFunc := range testFuncs {
Expand All @@ -431,13 +530,16 @@ func apiTest(kubeVersion string) error {
continue
}
if err == nil {
glog.V(2).Infof("All tests passed.")
break
}
if attempts == 0 {
glog.V(2).Info("Too many attempts.")
return err
}
glog.V(2).Infof("Some tests failed. Retrying.")
attempts--
time.Sleep(time.Second)
time.Sleep(time.Second * 10)
}
deleteAll(fm, ns, svc, rc)
removeHeapsterImage(fm)
Expand Down

0 comments on commit 655e135

Please sign in to comment.