From e745b63987810a29f3d85a271294f741af54424a Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 18 Nov 2016 16:36:01 -0500 Subject: [PATCH 1/3] client: Add a package or managing leases on arbitrary kv keys Built to initially be used by the migration code since our existing lease mechanisms for ranges and SQL schemas aren't very generalizable. --- pkg/internal/client/lease.go | 156 +++++++++++++ pkg/internal/client/lease.pb.go | 362 ++++++++++++++++++++++++++++++ pkg/internal/client/lease.proto | 28 +++ pkg/internal/client/lease_test.go | 180 +++++++++++++++ 4 files changed, 726 insertions(+) create mode 100644 pkg/internal/client/lease.go create mode 100644 pkg/internal/client/lease.pb.go create mode 100644 pkg/internal/client/lease.proto create mode 100644 pkg/internal/client/lease_test.go diff --git a/pkg/internal/client/lease.go b/pkg/internal/client/lease.go new file mode 100644 index 000000000000..ff395d34e36e --- /dev/null +++ b/pkg/internal/client/lease.go @@ -0,0 +1,156 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package client + +import ( + "fmt" + "time" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" +) + +// DefaultLeaseDuration is the duration a lease will be acquired for if no +// duration was specified in a LeaseManager's options. +// Exported for testing purposes. +const DefaultLeaseDuration = time.Minute + +// LeaseNotAvailableError indicates that the lease the caller attempted to +// acquire is currently held by a different client. +type LeaseNotAvailableError struct { + key roachpb.Key + expiration hlc.Timestamp +} + +func (e *LeaseNotAvailableError) Error() string { + return fmt.Sprintf("lease %q is not available until at least %s", e.key, e.expiration) +} + +// LeaseManager provides functionality for acquiring and managing leases +// via the kv api. +type LeaseManager struct { + db *DB + clock *hlc.Clock + clientID string + leaseDuration time.Duration +} + +// Lease contains the state of a lease on a particular key. +type Lease struct { + key roachpb.Key + val *LeaseVal +} + +// LeaseManagerOptions are used to configure a new LeaseManager. +type LeaseManagerOptions struct { + // ClientID must be unique to this LeaseManager instance. + ClientID string + LeaseDuration time.Duration +} + +// NewLeaseManager allocates a new LeaseManager. +func NewLeaseManager(db *DB, clock *hlc.Clock, options LeaseManagerOptions) *LeaseManager { + if options.ClientID == "" { + options.ClientID = uuid.MakeV4().String() + } + if options.LeaseDuration <= 0 { + options.LeaseDuration = DefaultLeaseDuration + } + return &LeaseManager{ + db: db, + clock: clock, + clientID: options.ClientID, + leaseDuration: options.LeaseDuration, + } +} + +// AcquireLease attempts to grab a lease on the provided key. Returns a non-nil +// lease object if it was successful, or an error if it failed to acquire the +// lease for any reason. +// +// NB: Acquiring a non-expired lease is allowed if this LeaseManager's clientID +// matches the lease owner's ID. This behavior allows a process to re-grab +// leases without having to wait if it restarts and uses the same ID. +func (m *LeaseManager) AcquireLease(ctx context.Context, key roachpb.Key) (*Lease, error) { + lease := &Lease{ + key: key, + } + if err := m.db.Txn(ctx, func(txn *Txn) error { + var val LeaseVal + err := txn.GetProto(key, &val) + if err != nil { + return err + } + if !m.leaseAvailable(&val) { + return &LeaseNotAvailableError{key: key, expiration: val.Expiration} + } + lease.val = &LeaseVal{ + Owner: m.clientID, + Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0), + } + return txn.Put(key, lease.val) + }); err != nil { + return nil, err + } + return lease, nil +} + +func (m *LeaseManager) leaseAvailable(val *LeaseVal) bool { + return val.Owner == m.clientID || m.timeRemaining(val) <= 0 +} + +// TimeRemaining returns the amount of time left on the given lease. +func (m *LeaseManager) TimeRemaining(l *Lease) time.Duration { + return m.timeRemaining(l.val) +} + +func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration { + return val.Expiration.GoTime().Sub(m.clock.Now().GoTime()) - m.clock.MaxOffset() +} + +// ExtendLease attempts to push the expiration time of the lease farther out +// into the future. +func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error { + if m.TimeRemaining(l) < 0 { + return errors.Errorf("can't extend lease that expired at time %s", l.val.Expiration) + } + + newVal := &LeaseVal{ + Owner: m.clientID, + Expiration: m.clock.Now().Add(m.leaseDuration.Nanoseconds(), 0), + } + + err := m.db.CPut(ctx, l.key, newVal, l.val) + if err != nil { + if _, ok := err.(*roachpb.ConditionFailedError); ok { + // Something is wrong - immediately expire the local lease state. + l.val.Expiration = hlc.ZeroTimestamp + return errors.Wrap(err, "local lease state %v out of sync with DB state") + } + return err + } + l.val = newVal + return nil +} + +// ReleaseLease attempts to release the given lease so that another process can +// grab it. +func (m *LeaseManager) ReleaseLease(ctx context.Context, l *Lease) error { + return m.db.CPut(ctx, l.key, nil, l.val) +} diff --git a/pkg/internal/client/lease.pb.go b/pkg/internal/client/lease.pb.go new file mode 100644 index 000000000000..dcf4d44befb3 --- /dev/null +++ b/pkg/internal/client/lease.pb.go @@ -0,0 +1,362 @@ +// Code generated by protoc-gen-gogo. +// source: cockroach/pkg/internal/client/lease.proto +// DO NOT EDIT! + +/* + Package client is a generated protocol buffer package. + + It is generated from these files: + cockroach/pkg/internal/client/lease.proto + + It has these top-level messages: + LeaseVal +*/ +package client + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" +import cockroach_util_hlc "github.com/cockroachdb/cockroach/pkg/util/hlc" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type LeaseVal struct { + // An opaque string that should be unique per client to identify which client + // owns the lease. + Owner string `protobuf:"bytes,1,opt,name=owner" json:"owner"` + // The expiration time of the lease. + Expiration cockroach_util_hlc.Timestamp `protobuf:"bytes,2,opt,name=expiration" json:"expiration"` +} + +func (m *LeaseVal) Reset() { *m = LeaseVal{} } +func (m *LeaseVal) String() string { return proto.CompactTextString(m) } +func (*LeaseVal) ProtoMessage() {} +func (*LeaseVal) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{0} } + +func init() { + proto.RegisterType((*LeaseVal)(nil), "cockroach.client.LeaseVal") +} +func (m *LeaseVal) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LeaseVal) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintLease(dAtA, i, uint64(len(m.Owner))) + i += copy(dAtA[i:], m.Owner) + dAtA[i] = 0x12 + i++ + i = encodeVarintLease(dAtA, i, uint64(m.Expiration.Size())) + n1, err := m.Expiration.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + return i, nil +} + +func encodeFixed64Lease(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Lease(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintLease(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *LeaseVal) Size() (n int) { + var l int + _ = l + l = len(m.Owner) + n += 1 + l + sovLease(uint64(l)) + l = m.Expiration.Size() + n += 1 + l + sovLease(uint64(l)) + return n +} + +func sovLease(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozLease(x uint64) (n int) { + return sovLease(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *LeaseVal) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LeaseVal: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LeaseVal: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Owner", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLease + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Owner = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Expiration", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLease + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Expiration.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLease(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLease + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipLease(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLease + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLease + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLease + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthLease + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLease + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipLease(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthLease = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowLease = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("cockroach/pkg/internal/client/lease.proto", fileDescriptorLease) } + +var fileDescriptorLease = []byte{ + // 216 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xd2, 0x4c, 0xce, 0x4f, 0xce, + 0x2e, 0xca, 0x4f, 0x4c, 0xce, 0xd0, 0x2f, 0xc8, 0x4e, 0xd7, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, + 0x4b, 0xcc, 0xd1, 0x4f, 0xce, 0xc9, 0x4c, 0xcd, 0x2b, 0xd1, 0xcf, 0x49, 0x4d, 0x2c, 0x4e, 0xd5, + 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x80, 0x2b, 0xd5, 0x83, 0xc8, 0x4a, 0xa9, 0xa1, 0x6a, + 0x2e, 0x2d, 0xc9, 0xcc, 0xd1, 0xcf, 0xc8, 0x49, 0xd6, 0x2f, 0xc9, 0xcc, 0x4d, 0x2d, 0x2e, 0x49, + 0xcc, 0x2d, 0x80, 0xe8, 0x94, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x33, 0xf5, 0x41, 0x2c, 0x88, + 0xa8, 0x52, 0x36, 0x17, 0x87, 0x0f, 0xc8, 0xf8, 0xb0, 0xc4, 0x1c, 0x21, 0x29, 0x2e, 0xd6, 0xfc, + 0xf2, 0xbc, 0xd4, 0x22, 0x09, 0x46, 0x05, 0x46, 0x0d, 0x4e, 0x27, 0x96, 0x13, 0xf7, 0xe4, 0x19, + 0x82, 0x20, 0x42, 0x42, 0xce, 0x5c, 0x5c, 0xa9, 0x15, 0x05, 0x99, 0x45, 0x89, 0x25, 0x99, 0xf9, + 0x79, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xdc, 0x46, 0xb2, 0x7a, 0x08, 0xc7, 0x80, 0xac, 0xd5, 0xcb, + 0xc8, 0x49, 0xd6, 0x0b, 0x81, 0x59, 0x0b, 0xd5, 0x8f, 0xa4, 0xcd, 0x49, 0xe1, 0xc4, 0x43, 0x39, + 0x86, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0xbc, 0xf1, 0x48, 0x8e, 0xf1, 0xc1, 0x23, + 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0xa2, 0xd8, 0x20, 0x9e, 0x01, 0x04, 0x00, 0x00, 0xff, 0xff, + 0xec, 0xea, 0x38, 0xb6, 0x0a, 0x01, 0x00, 0x00, +} diff --git a/pkg/internal/client/lease.proto b/pkg/internal/client/lease.proto new file mode 100644 index 000000000000..b2265ac32477 --- /dev/null +++ b/pkg/internal/client/lease.proto @@ -0,0 +1,28 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +syntax = "proto2"; +package cockroach.client; +option go_package = "client"; + +import "cockroach/pkg/util/hlc/timestamp.proto"; +import "gogoproto/gogo.proto"; + +message LeaseVal { + // An opaque string that should be unique per client to identify which client + // owns the lease. + optional string owner = 1 [(gogoproto.nullable) = false]; + // The expiration time of the lease. + optional util.hlc.Timestamp expiration = 2 [(gogoproto.nullable) = false]; +} diff --git a/pkg/internal/client/lease_test.go b/pkg/internal/client/lease_test.go new file mode 100644 index 000000000000..812ed645eca3 --- /dev/null +++ b/pkg/internal/client/lease_test.go @@ -0,0 +1,180 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package client_test + +import ( + "testing" + "time" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +const ( + clientID1 = "1" + clientID2 = "2" +) + +var ( + leaseKey = roachpb.Key("/SystemVersion/lease") +) + +func TestAcquireAndRelease(t *testing.T) { + defer leaktest.AfterTest(t)() + s, db := setup(t) + defer s.Stopper().Stop() + + ctx := context.Background() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + lm := client.NewLeaseManager(db, clock, client.LeaseManagerOptions{ClientID: clientID1}) + + l, err := lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + if err := lm.ReleaseLease(ctx, l); err != nil { + t.Fatal(err) + } + if err := lm.ReleaseLease(ctx, l); !testutils.IsError(err, "unexpected value") { + t.Fatal(err) + } + + l, err = lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + if err := lm.ReleaseLease(ctx, l); err != nil { + t.Fatal(err) + } +} + +func TestReacquireLease(t *testing.T) { + defer leaktest.AfterTest(t)() + s, db := setup(t) + defer s.Stopper().Stop() + + ctx := context.Background() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + lm := client.NewLeaseManager(db, clock, client.LeaseManagerOptions{ClientID: clientID1}) + + l, err := lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + + // We allow re-acquiring the same lease as long as the client ID is + // the same to allow a client to reacquire its own leases rather than + // having to wait them out if it crashes and restarts. + l, err = lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + if err := lm.ReleaseLease(ctx, l); err != nil { + t.Fatal(err) + } +} + +func TestExtendLease(t *testing.T) { + defer leaktest.AfterTest(t)() + s, db := setup(t) + defer s.Stopper().Stop() + + ctx := context.Background() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + lm := client.NewLeaseManager(db, clock, client.LeaseManagerOptions{ClientID: clientID1}) + + l, err := lm.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + + manual.Increment(int64(time.Second)) + timeRemainingBefore := lm.TimeRemaining(l) + if err := lm.ExtendLease(ctx, l); err != nil { + t.Fatal(err) + } + timeRemainingAfter := lm.TimeRemaining(l) + if !(timeRemainingAfter > timeRemainingBefore) { + t.Errorf("expected time remaining after renewal (%s) to be greater than before renewal (%s)", + timeRemainingAfter, timeRemainingBefore) + } + + manual.Increment(int64(client.DefaultLeaseDuration) + 1) + if tr := lm.TimeRemaining(l); tr >= 0 { + t.Errorf("expected negative time remaining on lease, got %s", tr) + } + if err := lm.ExtendLease(ctx, l); !testutils.IsError(err, "can't extend lease that expired") { + t.Fatalf("didn't get expected error when renewing lease %+v: %v", l, err) + } + + if err := lm.ReleaseLease(ctx, l); err != nil { + t.Fatal(err) + } +} + +func TestLeasesMultipleClients(t *testing.T) { + defer leaktest.AfterTest(t)() + s, db := setup(t) + defer s.Stopper().Stop() + + ctx := context.Background() + manual1 := hlc.NewManualClock(123) + clock1 := hlc.NewClock(manual1.UnixNano, time.Nanosecond) + manual2 := hlc.NewManualClock(123) + clock2 := hlc.NewClock(manual2.UnixNano, time.Nanosecond) + lm1 := client.NewLeaseManager(db, clock1, client.LeaseManagerOptions{ClientID: clientID1}) + lm2 := client.NewLeaseManager(db, clock2, client.LeaseManagerOptions{ClientID: clientID2}) + + l1, err := lm1.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + _, err = lm2.AcquireLease(ctx, leaseKey) + if !testutils.IsError(err, "is not available until") { + t.Fatalf("didn't get expected error trying to acquire already held lease: %v", err) + } + if _, ok := err.(*client.LeaseNotAvailableError); !ok { + t.Fatalf("expected LeaseNotAvailableError, got %v", err) + } + + // Ensure a lease can be "stolen" after it's expired. + manual2.Increment(int64(client.DefaultLeaseDuration) + 1) + l2, err := lm2.AcquireLease(ctx, leaseKey) + if err != nil { + t.Fatal(err) + } + + // lm1's clock indicates that its lease should still be valid, but it doesn't + // own it anymore. + manual1.Increment(int64(client.DefaultLeaseDuration) / 2) + if err := lm1.ExtendLease(ctx, l1); !testutils.IsError(err, "out of sync with DB state") { + t.Fatalf("didn't get expected error trying to extend expired lease: %v", err) + } + if err := lm1.ReleaseLease(ctx, l1); !testutils.IsError(err, "unexpected value") { + t.Fatalf("didn't get expected error trying to release stolen lease: %v", err) + } + + if err := lm2.ReleaseLease(ctx, l2); err != nil { + t.Fatal(err) + } +} From 3a603c7aa29746e745dd20b41de8e6e7efbbd325 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Wed, 26 Oct 2016 12:42:30 -0400 Subject: [PATCH 2/3] migrations: Add a package to manage system migrations on startup This is needed to allow us to make certain changes between versions that wouldn't be safe otherwise. --- pkg/keys/constants.go | 10 + pkg/migrations/migrations.go | 237 +++++++++++++++++++++ pkg/migrations/migrations_test.go | 330 ++++++++++++++++++++++++++++++ pkg/server/server.go | 33 ++- 4 files changed, 608 insertions(+), 2 deletions(-) create mode 100644 pkg/migrations/migrations.go create mode 100644 pkg/migrations/migrations_test.go diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index e738de4ef59b..557200425211 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -191,6 +191,16 @@ var ( SystemPrefix = roachpb.Key{systemPrefixByte} SystemMax = roachpb.Key{systemMaxByte} + // MigrationPrefix specifies the key prefix to store all migration details. + MigrationPrefix = roachpb.Key(makeKey(SystemPrefix, roachpb.RKey("system-version/"))) + + // MigrationKeyMax is the maximum value for any system migration key. + MigrationKeyMax = MigrationPrefix.PrefixEnd() + + // MigrationLease is the key that nodes must take a lease on in order to run + // system migrations on the cluster. + MigrationLease = roachpb.Key(makeKey(MigrationPrefix, roachpb.RKey("lease"))) + // NodeLivenessPrefix specifies the key prefix for the node liveness // table. Note that this should sort before the rest of the system // keyspace in order to limit the number of ranges which must use diff --git a/pkg/migrations/migrations.go b/pkg/migrations/migrations.go new file mode 100644 index 000000000000..3bdf2a390bfb --- /dev/null +++ b/pkg/migrations/migrations.go @@ -0,0 +1,237 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package migrations + +import ( + "time" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/pkg/errors" +) + +var ( + leaseDuration = time.Minute + leaseRefreshInterval = leaseDuration / 5 +) + +// backwardCompatibleMigrations is a hard-coded list of migrations to be run on +// startup. They will always be run from top-to-bottom, and because they are +// assumed to be backward-compatible, they will be run regardless of what other +// node versions are currently running within the cluster. +var backwardCompatibleMigrations = []migrationDescriptor{ + { + name: "example migration", + workFn: exampleNoopMigration, + }, +} + +// migrationDescriptor describes a single migration hook that's used to modify +// some part of the cluster state when the CockroachDB version is upgraded. +// See docs/RFCs/cluster_upgrade_tool.md for details. +type migrationDescriptor struct { + // name must be unique amongst all hard-coded migrations. + name string + // workFn must be idempotent so that we can safely re-run it if a node failed + // while running it. + workFn func(context.Context, runner) error +} + +type runner struct { + db db + sqlExecutor *sql.Executor +} + +// leaseManager is defined just to allow us to use a fake client.LeaseManager +// when testing this package. +type leaseManager interface { + AcquireLease(ctx context.Context, key roachpb.Key) (*client.Lease, error) + ExtendLease(ctx context.Context, l *client.Lease) error + ReleaseLease(ctx context.Context, l *client.Lease) error + TimeRemaining(l *client.Lease) time.Duration +} + +// db is defined just to allow us to use a fake client.DB when testing this +// package. +type db interface { + Scan(ctx context.Context, begin, end interface{}, maxRows int64) ([]client.KeyValue, error) + Put(ctx context.Context, key, value interface{}) error +} + +// Manager encapsulates the necessary functionality for handling migrations +// of data in the cluster. +type Manager struct { + stopper *stop.Stopper + leaseManager leaseManager + db db + sqlExecutor *sql.Executor +} + +// NewManager initializes and returns a new Manager object. +func NewManager( + stopper *stop.Stopper, db *client.DB, executor *sql.Executor, clock *hlc.Clock, clientID string, +) *Manager { + opts := client.LeaseManagerOptions{ + ClientID: clientID, + LeaseDuration: leaseDuration, + } + return &Manager{ + stopper: stopper, + leaseManager: client.NewLeaseManager(db, clock, opts), + db: db, + sqlExecutor: executor, + } +} + +// EnsureMigrations should be run during node startup to ensure that all +// required migrations have been run (and running all those that are definitely +// safe to run). +func (m *Manager) EnsureMigrations(ctx context.Context) error { + // First, check whether there are any migrations that need to be run. + completedMigrations, err := m.getCompletedMigrations(ctx) + if err != nil { + return err + } + allMigrationsCompleted := true + for _, migration := range backwardCompatibleMigrations { + key := migrationKey(migration) + if _, ok := completedMigrations[string(key)]; !ok { + allMigrationsCompleted = false + } + } + if allMigrationsCompleted { + return nil + } + + // If there are any, grab the migration lease to ensure that only one + // node is ever doing migrations at a time. + // Note that we shouldn't ever let client.LeaseNotAvailableErrors cause us + // to stop trying, because if we return an error the server will be shut down, + // and this server being down may prevent the leaseholder from finishing. + var lease *client.Lease + if log.V(1) { + log.Info(ctx, "trying to acquire lease") + } + for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { + lease, err = m.leaseManager.AcquireLease(ctx, keys.MigrationLease) + if err == nil { + break + } + log.Errorf(ctx, "failed attempt to acquire migration lease: %s", err) + } + if err != nil { + return errors.Wrapf(err, "failed to acquire lease for running necessary migrations") + } + + // Ensure that we hold the lease throughout the migration process and release + // it when we're done. + done := make(chan interface{}, 1) + defer func() { + done <- nil + if log.V(1) { + log.Info(ctx, "trying to release the lease") + } + if err := m.leaseManager.ReleaseLease(ctx, lease); err != nil { + log.Errorf(ctx, "failed to release migration lease: %s", err) + } + }() + if err := m.stopper.RunAsyncTask(ctx, func(ctx context.Context) { + select { + case <-done: + return + case <-time.After(leaseRefreshInterval): + if err := m.leaseManager.ExtendLease(ctx, lease); err != nil { + log.Warningf(ctx, "unable to extend ownership of expiration lease: %s", err) + } + if m.leaseManager.TimeRemaining(lease) < leaseRefreshInterval { + // Note that we may be able to do better than this by influencing the + // deadline of migrations' transactions based on the least expiration + // time, but simply kill the process for now for the sake of simplicity. + log.Fatal(ctx, "not enough time left on migration lease, terminating for safety") + } + } + }); err != nil { + return err + } + + // Re-get the list of migrations in case any of them were completed between + // our initial check and our grabbing of the lease. + completedMigrations, err = m.getCompletedMigrations(ctx) + if err != nil { + return err + } + + startTime := timeutil.Now().String() + r := runner{ + db: m.db, + sqlExecutor: m.sqlExecutor, + } + for _, migration := range backwardCompatibleMigrations { + key := migrationKey(migration) + if _, ok := completedMigrations[string(key)]; ok { + continue + } + + if log.V(1) { + log.Infof(ctx, "running migration %q", migration.name) + } + if err := migration.workFn(ctx, r); err != nil { + return errors.Wrapf(err, "failed to run migration %q", migration.name) + } + + if log.V(1) { + log.Infof(ctx, "trying to persist record of completing migration %s", migration.name) + } + if err := m.db.Put(ctx, key, startTime); err != nil { + return errors.Wrapf(err, "failed to persist record of completing migration %q", + migration.name) + } + } + + return nil +} + +func (m *Manager) getCompletedMigrations(ctx context.Context) (map[string]struct{}, error) { + if log.V(1) { + log.Info(ctx, "trying to get the list of completed migrations") + } + keyvals, err := m.db.Scan(ctx, keys.MigrationPrefix, keys.MigrationKeyMax, 0 /* maxRows */) + if err != nil { + return nil, errors.Wrapf(err, "failed to get list of completed migrations") + } + completedMigrations := make(map[string]struct{}) + for _, keyval := range keyvals { + completedMigrations[string(keyval.Key)] = struct{}{} + } + return completedMigrations, nil +} + +func migrationKey(migration migrationDescriptor) roachpb.Key { + return append(keys.MigrationPrefix, roachpb.RKey(migration.name)...) +} + +func exampleNoopMigration(ctx context.Context, r runner) error { + return nil +} diff --git a/pkg/migrations/migrations_test.go b/pkg/migrations/migrations_test.go new file mode 100644 index 000000000000..fe90125800e5 --- /dev/null +++ b/pkg/migrations/migrations_test.go @@ -0,0 +1,330 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package migrations + +import ( + "bytes" + "fmt" + "os" + "testing" + "time" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/pkg/errors" +) + +var ( + noopMigration1 = migrationDescriptor{ + name: "noop 1", + workFn: func(_ context.Context, _ runner) error { return nil }, + } + noopMigration2 = migrationDescriptor{ + name: "noop 2", + workFn: func(_ context.Context, _ runner) error { return nil }, + } + errorMigration = migrationDescriptor{ + name: "error", + workFn: func(_ context.Context, _ runner) error { return errors.New("errorMigration") }, + } + panicMigration = migrationDescriptor{ + name: "panic", + workFn: func(_ context.Context, _ runner) error { panic("panicMigration") }, + } +) + +type fakeLeaseManager struct { + extendErr error + releaseErr error + leaseTimeRemaining time.Duration +} + +func (f *fakeLeaseManager) AcquireLease( + ctx context.Context, key roachpb.Key, +) (*client.Lease, error) { + return &client.Lease{}, nil +} + +func (f *fakeLeaseManager) ExtendLease(ctx context.Context, l *client.Lease) error { + return f.extendErr +} + +func (f *fakeLeaseManager) ReleaseLease(ctx context.Context, l *client.Lease) error { + return f.releaseErr +} + +func (f *fakeLeaseManager) TimeRemaining(l *client.Lease) time.Duration { + // Default to a reasonable amount of time left if the field wasn't set. + if f.leaseTimeRemaining == 0 { + return leaseRefreshInterval * 2 + } + return f.leaseTimeRemaining +} + +type fakeDB struct { + kvs map[string][]byte + scanErr error + putErr error +} + +func (f *fakeDB) Scan( + ctx context.Context, begin, end interface{}, maxRows int64, +) ([]client.KeyValue, error) { + if f.scanErr != nil { + return nil, f.scanErr + } + if !bytes.Equal(begin.(roachpb.Key), keys.MigrationPrefix) { + return nil, errors.Errorf("expected begin key %q, got %q", keys.MigrationPrefix, begin) + } + if !bytes.Equal(end.(roachpb.Key), keys.MigrationKeyMax) { + return nil, errors.Errorf("expected end key %q, got %q", keys.MigrationKeyMax, end) + } + var results []client.KeyValue + for k, v := range f.kvs { + results = append(results, client.KeyValue{ + Key: []byte(k), + Value: &roachpb.Value{RawBytes: v}, + }) + } + return results, nil +} + +func (f *fakeDB) Put(ctx context.Context, key, value interface{}) error { + if f.putErr != nil { + return f.putErr + } + f.kvs[string(key.(roachpb.Key))] = []byte(value.(string)) + return nil +} + +func TestEnsureMigrations(t *testing.T) { + db := &fakeDB{} + mgr := Manager{ + stopper: stop.NewStopper(), + leaseManager: &fakeLeaseManager{}, + db: db, + } + defer mgr.stopper.Stop() + + fnGotCalled := false + fnGotCalledDescriptor := migrationDescriptor{ + name: "got-called-verifier", + workFn: func(context.Context, runner) error { + fnGotCalled = true + return nil + }, + } + testCases := []struct { + preCompleted []migrationDescriptor + migrations []migrationDescriptor + expectedErr string + }{ + { + nil, + nil, + "", + }, + { + nil, + []migrationDescriptor{noopMigration1}, + "", + }, + { + []migrationDescriptor{noopMigration1}, + []migrationDescriptor{noopMigration1}, + "", + }, + { + []migrationDescriptor{}, + []migrationDescriptor{noopMigration1, noopMigration2}, + "", + }, + { + []migrationDescriptor{noopMigration1}, + []migrationDescriptor{noopMigration1, noopMigration2}, + "", + }, + { + []migrationDescriptor{noopMigration1, noopMigration2, panicMigration}, + []migrationDescriptor{noopMigration1, noopMigration2, panicMigration}, + "", + }, + { + []migrationDescriptor{noopMigration1, noopMigration2}, + []migrationDescriptor{noopMigration1, noopMigration2, fnGotCalledDescriptor}, + "", + }, + { + []migrationDescriptor{noopMigration1, noopMigration2}, + []migrationDescriptor{noopMigration1, noopMigration2, errorMigration}, + fmt.Sprintf("failed to run migration %q", errorMigration.name), + }, + } + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + db.kvs = make(map[string][]byte) + for _, name := range tc.preCompleted { + db.kvs[string(migrationKey(name))] = []byte{} + } + backwardCompatibleMigrations = tc.migrations + err := mgr.EnsureMigrations(context.Background()) + if !testutils.IsError(err, tc.expectedErr) { + t.Errorf("expected error %q, got error %v", tc.expectedErr, err) + } + if err != nil { + return + } + for _, migration := range tc.migrations { + if _, ok := db.kvs[string(migrationKey(migration))]; !ok { + t.Errorf("expected key %s to be written, but it wasn't", migrationKey(migration)) + } + } + if len(db.kvs) != len(tc.migrations) { + t.Errorf("expected %d key to be written, but %d were", + len(tc.migrations), len(db.kvs)) + } + }) + } + if !fnGotCalled { + t.Errorf("expected fnGotCalledDescriptor to be run by the migration coordinator, but it wasn't") + } +} + +func TestDBErrors(t *testing.T) { + db := &fakeDB{} + mgr := Manager{ + stopper: stop.NewStopper(), + leaseManager: &fakeLeaseManager{}, + db: db, + } + defer mgr.stopper.Stop() + + migration := noopMigration1 + backwardCompatibleMigrations = []migrationDescriptor{migration} + testCases := []struct { + scanErr error + putErr error + expectedErr string + }{ + { + nil, + nil, + "", + }, + { + fmt.Errorf("context deadline exceeded"), + nil, + "failed to get list of completed migrations.*context deadline exceeded", + }, + { + nil, + fmt.Errorf("context deadline exceeded"), + "failed to persist record of completing migration.*context deadline exceeded", + }, + } + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + db.scanErr = tc.scanErr + db.putErr = tc.putErr + db.kvs = make(map[string][]byte) + err := mgr.EnsureMigrations(context.Background()) + if !testutils.IsError(err, tc.expectedErr) { + t.Errorf("expected error %q, got error %v", tc.expectedErr, err) + } + if err != nil { + return + } + if _, ok := db.kvs[string(migrationKey(migration))]; !ok { + t.Errorf("expected key %s to be written, but it wasn't", migrationKey(migration)) + } + if len(db.kvs) != len(backwardCompatibleMigrations) { + t.Errorf("expected %d key to be written, but %d were", + len(backwardCompatibleMigrations), len(db.kvs)) + } + }) + } +} + +// ExtendLease and ReleaseLease errors should not, by themselves, cause the +// migration process to fail. Not being able to acquire the lease should, but +// we don't test that here due to the added code that would be needed to change +// its retry settings to allow for testing it in a reasonable amount of time. +func TestLeaseErrors(t *testing.T) { + db := &fakeDB{kvs: make(map[string][]byte)} + mgr := Manager{ + stopper: stop.NewStopper(), + leaseManager: &fakeLeaseManager{ + extendErr: fmt.Errorf("context deadline exceeded"), + releaseErr: fmt.Errorf("context deadline exceeded"), + }, + db: db, + } + defer mgr.stopper.Stop() + + migration := noopMigration1 + backwardCompatibleMigrations = []migrationDescriptor{migration} + if err := mgr.EnsureMigrations(context.Background()); err != nil { + t.Error(err) + } + if _, ok := db.kvs[string(migrationKey(migration))]; !ok { + t.Errorf("expected key %s to be written, but it wasn't", migrationKey(migration)) + } + if len(db.kvs) != len(backwardCompatibleMigrations) { + t.Errorf("expected %d key to be written, but %d were", + len(backwardCompatibleMigrations), len(db.kvs)) + } +} + +// The lease not having enough time left on it to finish migrations should +// cause the process to exit via a call to log.Fatal. +func TestLeaseExpiration(t *testing.T) { + db := &fakeDB{kvs: make(map[string][]byte)} + mgr := Manager{ + stopper: stop.NewStopper(), + leaseManager: &fakeLeaseManager{leaseTimeRemaining: time.Nanosecond}, + db: db, + } + defer mgr.stopper.Stop() + + oldLeaseRefreshInterval := leaseRefreshInterval + leaseRefreshInterval = time.Microsecond + defer func() { leaseRefreshInterval = oldLeaseRefreshInterval }() + + exitCalled := make(chan bool) + log.SetExitFunc(func(int) { exitCalled <- true }) + defer log.SetExitFunc(os.Exit) + + waitForExitMigration := migrationDescriptor{ + name: "wait for exit to be called", + workFn: func(context.Context, runner) error { + select { + case <-exitCalled: + return nil + case <-time.After(10 * time.Second): + return fmt.Errorf("timed out waiting for exit to be called") + } + }, + } + backwardCompatibleMigrations = []migrationDescriptor{waitForExitMigration} + if err := mgr.EnsureMigrations(context.Background()); err != nil { + t.Error(err) + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index c8159c5adcd9..35d354422bf1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/migrations" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -506,7 +507,17 @@ func (s *Server) Start(ctx context.Context) error { netutil.FatalIfUnexpected(s.grpc.Serve(anyL)) }) + // Running the SQL migrations safely requires that we aren't serving SQL + // requests at the same time -- to ensure that, block the serving of SQL + // traffic until the migrations are done, as indicated by this channel. + serveSQL := make(chan bool) + s.stopper.RunWorker(func() { + select { + case <-serveSQL: + case <-s.stopper.ShouldQuiesce(): + return + } pgCtx := s.pgServer.AmbientCtx.AnnotateCtx(context.Background()) netutil.FatalIfUnexpected(httpServer.ServeWith(s.stopper, pgL, func(conn net.Conn) { connCtx := log.WithLogTagStr(pgCtx, "client", conn.RemoteAddr().String()) @@ -584,8 +595,6 @@ func (s *Server) Start(ctx context.Context) error { } log.Event(ctx, "started node") - s.nodeLiveness.StartHeartbeat(ctx, s.stopper) - // We can now add the node registry. s.recorder.AddNode(s.registry, s.node.Descriptor, s.node.startedAt) @@ -635,6 +644,11 @@ func (s *Server) Start(ctx context.Context) error { }) s.stopper.RunWorker(func() { + select { + case <-serveSQL: + case <-s.stopper.ShouldQuiesce(): + return + } pgCtx := s.pgServer.AmbientCtx.AnnotateCtx(context.Background()) netutil.FatalIfUnexpected(httpServer.ServeWith(s.stopper, unixLn, func(conn net.Conn) { connCtx := log.WithLogTagStr(pgCtx, "client", conn.RemoteAddr().String()) @@ -650,6 +664,21 @@ func (s *Server) Start(ctx context.Context) error { log.Event(ctx, "accepting connections") + s.nodeLiveness.StartHeartbeat(ctx, s.stopper) + + // Before serving SQL requests, we have to make sure the database is + // in an acceptable form for this version of the software. + // We have to do this after actually starting up the server to be able to + // seamlessly use the kv client against other nodes in the cluster. + migMgr := migrations.NewManager( + s.stopper, s.db, s.sqlExecutor, s.clock, s.NodeID().String()) + if err := migMgr.EnsureMigrations(ctx); err != nil { + log.Fatal(ctx, err) + } + log.Infof(ctx, "done ensuring all necessary migrations have run") + close(serveSQL) + log.Info(ctx, "serving sql connections") + // Initialize grpc-gateway mux and context. jsonpb := &protoutil.JSONPb{ EnumsAsInts: true, From 967327dc8a7c68b7a63c3bbbb151540c7b900b4f Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Tue, 29 Nov 2016 11:30:32 -0500 Subject: [PATCH 3/3] acceptance: Wait in StartCluster until nodes can serve SQL Now that there's a delay between when a node starts serving kv and when it starts serving SQL (due to the SQL migration code path), acceptance tests that rely on SQL can have a bad time without this. --- pkg/acceptance/util.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/acceptance/util.go b/pkg/acceptance/util.go index 33eb341346ce..c47adef497fa 100644 --- a/pkg/acceptance/util.go +++ b/pkg/acceptance/util.go @@ -422,6 +422,21 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c }) } + // Ensure that all nodes are serving SQL by making sure a simple + // read-only query succeeds. + for i := 0; i < c.NumNodes(); i++ { + testutils.SucceedsSoon(t, func() error { + db, err := gosql.Open("postgres", c.PGUrl(ctx, i)) + if err != nil { + return err + } + if _, err := db.Exec("SHOW DATABASES;"); err != nil { + return err + } + return nil + }) + } + completed = true return c }