Skip to content

Commit

Permalink
Unittest for kube-apiserver backed store
Browse files Browse the repository at this point in the history
Signed-off-by: Aylei <rayingecho@gmail.com>
  • Loading branch information
aylei committed Oct 29, 2019
1 parent 966ba78 commit f6e67da
Show file tree
Hide file tree
Showing 3 changed files with 1,030 additions and 18 deletions.
8 changes: 7 additions & 1 deletion pkg/apiserver/storage/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package storage
import (
"fmt"

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
Expand Down Expand Up @@ -55,7 +57,11 @@ func (f *ApiServerRestOptionsFactory) newApiServerStorageDecorator() generic.Sto
getAttrsFunc storage.AttrFunc,
trigger storage.TriggerPublisherFunc,
) (storage.Interface, factory.DestroyFunc) {
return NewApiServerStore(f.RestConfig, f.Codec, f.StorageNamespace, objectType, newListFunc)
cli, err := versioned.NewForConfig(f.RestConfig)
if err != nil {
glog.Fatalf("failed to create Clientset: %v", err)
}
return NewApiServerStore(cli, f.Codec, f.StorageNamespace, objectType, newListFunc)
}
}

Expand Down
43 changes: 26 additions & 17 deletions pkg/apiserver/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ import (
informerv1alpha1 "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions/pingcap/v1alpha1"
listerv1alpha1 "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/client-go/rest"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// store implements a ConfigMap backed storage.Interface
Expand All @@ -58,11 +56,8 @@ type store struct {
}

// New returns an kubernetes configmap implementation of storage.Interface.
func NewApiServerStore(restConfig *rest.Config, codec runtime.Codec, namespace string, objType runtime.Object, newListFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc) {
cli, err := versioned.NewForConfig(restConfig)
if err != nil {
glog.Fatalf("failed to create Clientset: %v", err)
}
func NewApiServerStore(cli versioned.Interface, codec runtime.Codec, namespace string, objType runtime.Object, newListFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc) {

informerFactory := informers.NewSharedInformerFactoryWithOptions(cli, 1*time.Minute, informers.WithNamespace(namespace))

inf := informerFactory.Pingcap().V1alpha1().DataResources()
Expand Down Expand Up @@ -141,11 +136,13 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,

func (s *store) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
objKey := newObjectKey(key)
return s.client.Delete(objKey.fullName(), &metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{
deleteOpt := &metav1.DeleteOptions{}
if preconditions != nil && preconditions.UID != nil {
deleteOpt.Preconditions = &metav1.Preconditions{
UID: preconditions.UID,
},
})
}
}
return s.client.Delete(objKey.fullName(), deleteOpt)
}

func (s *store) Watch(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate) (watch.Interface, error) {
Expand All @@ -170,6 +167,7 @@ func (s *store) WatchList(ctx context.Context, key string, resourceVersion strin
return outWatcher, nil
}

// FIXME: resourceVersion should be respected
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
objKey := newObjectKey(key)
selector, err := objKey.selector()
Expand All @@ -196,6 +194,7 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out
return decode(s.codec, s.versioner, ret[0].Data, out, int64(rv))
}

// FIXME: resourceVersion should be respected
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
Expand Down Expand Up @@ -226,6 +225,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
return s.versioner.UpdateList(listObj, 0, "")
}

// FIXME: resourceVersion should be respected
func (s *store) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
Expand Down Expand Up @@ -293,16 +293,18 @@ func (s *store) GuaranteedUpdate(ctx context.Context,
return state, nil
}
var origState *objState
origState, err = getCurrentState()
if err != nil {
return err
}
shouldRefresh := false
shouldRefresh := true
for {
if shouldRefresh {
// Refresh object
origState, err = getCurrentState()
if err != nil {
if apierrors.IsNotFound(err) {
if ignoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
}
return err
}
shouldRefresh = false
Expand Down Expand Up @@ -330,6 +332,12 @@ func (s *store) GuaranteedUpdate(ctx context.Context,
toUpdate.Data = data
updateResp, err := s.client.Update(toUpdate)
if err != nil {
if apierrors.IsNotFound(err) {
if ignoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
}
if apierrors.IsConflict(err) {
shouldRefresh = true
// Retry
Expand Down Expand Up @@ -449,6 +457,7 @@ func (w *watcherWrapperWithPrediction) run() {
}
}
case <-w.stopCh:
close(w.resultChan)
return
}
}
Expand Down
Loading

0 comments on commit f6e67da

Please sign in to comment.