https://github.com/coreos/etcd
Raw File
Tip revision: b7ff47f9d50641cfe30b278beffe34ea71cafc46 authored by Joe Betz on 15 June 2018, 16:47:04 UTC
version: bump up to 3.1.18
Tip revision: b7ff47f
balancer.go
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientv3

import (
	"net/url"
	"strings"
	"sync"

	"golang.org/x/net/context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
)

// ErrNoAddrAvilable is returned by Get() when the balancer does not have
// any active connection to endpoints at the time.
// This error is returned only when opts.BlockingWait is true.
var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")

// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
	// addrs are the client's endpoints for grpc
	addrs []grpc.Address
	// notifyCh notifies grpc of the set of addresses for connecting
	notifyCh chan []grpc.Address

	// readyc closes once the first connection is up
	readyc    chan struct{}
	readyOnce sync.Once

	// mu protects upEps, pinAddr, and connectingAddr
	mu sync.RWMutex
	// upEps holds the current endpoints that have an active connection
	upEps map[string]struct{}
	// upc closes when upEps transitions from empty to non-zero or the balancer closes.
	upc chan struct{}

	// grpc issues TLS cert checks using the string passed into dial so
	// that string must be the host. To recover the full scheme://host URL,
	// have a map from hosts to the original endpoint.
	host2ep map[string]string

	// pinAddr is the currently pinned address; set to the empty string on
	// intialization and shutdown.
	pinAddr string

	closed bool
}

func newSimpleBalancer(eps []string) *simpleBalancer {
	notifyCh := make(chan []grpc.Address, 1)
	addrs := make([]grpc.Address, len(eps))
	for i := range eps {
		addrs[i].Addr = getHost(eps[i])
	}
	notifyCh <- addrs
	sb := &simpleBalancer{
		addrs:    addrs,
		notifyCh: notifyCh,
		readyc:   make(chan struct{}),
		upEps:    make(map[string]struct{}),
		upc:      make(chan struct{}),
		host2ep:  getHost2ep(eps),
	}
	return sb
}

func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error { return nil }

func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
	b.mu.Lock()
	defer b.mu.Unlock()
	return b.upc
}

func (b *simpleBalancer) getEndpoint(host string) string {
	b.mu.Lock()
	defer b.mu.Unlock()
	return b.host2ep[host]
}

func getHost2ep(eps []string) map[string]string {
	hm := make(map[string]string, len(eps))
	for i := range eps {
		_, host, _ := parseEndpoint(eps[i])
		hm[host] = eps[i]
	}
	return hm
}

func (b *simpleBalancer) updateAddrs(eps []string) {
	np := getHost2ep(eps)

	b.mu.Lock()
	defer b.mu.Unlock()

	match := len(np) == len(b.host2ep)
	for k, v := range np {
		if b.host2ep[k] != v {
			match = false
			break
		}
	}
	if match {
		// same endpoints, so no need to update address
		return
	}

	b.host2ep = np

	addrs := make([]grpc.Address, 0, len(eps))
	for i := range eps {
		addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
	}
	b.addrs = addrs
	b.notifyCh <- addrs
}

func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
	b.mu.Lock()
	defer b.mu.Unlock()

	// gRPC might call Up after it called Close. We add this check
	// to "fix" it up at application layer. Or our simplerBalancer
	// might panic since b.upc is closed.
	if b.closed {
		return func(err error) {}
	}

	if len(b.upEps) == 0 {
		// notify waiting Get()s and pin first connected address
		close(b.upc)
		b.pinAddr = addr.Addr
	}
	b.upEps[addr.Addr] = struct{}{}

	// notify client that a connection is up
	b.readyOnce.Do(func() { close(b.readyc) })

	return func(err error) {
		b.mu.Lock()
		delete(b.upEps, addr.Addr)
		if len(b.upEps) == 0 && b.pinAddr != "" {
			b.upc = make(chan struct{})
		} else if b.pinAddr == addr.Addr {
			// choose new random up endpoint
			for k := range b.upEps {
				b.pinAddr = k
				break
			}
		}
		b.mu.Unlock()
	}
}

func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
	var addr string

	// If opts.BlockingWait is false (for fail-fast RPCs), it should return
	// an address it has notified via Notify immediately instead of blocking.
	if !opts.BlockingWait {
		b.mu.RLock()
		closed := b.closed
		addr = b.pinAddr
		upEps := len(b.upEps)
		b.mu.RUnlock()
		if closed {
			return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
		}

		if upEps == 0 {
			return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable
		}
		return grpc.Address{Addr: addr}, func() {}, nil
	}

	for {
		b.mu.RLock()
		ch := b.upc
		b.mu.RUnlock()
		select {
		case <-ch:
		case <-ctx.Done():
			return grpc.Address{Addr: ""}, nil, ctx.Err()
		}
		b.mu.RLock()
		addr = b.pinAddr
		upEps := len(b.upEps)
		b.mu.RUnlock()
		if addr == "" {
			return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
		}
		if upEps > 0 {
			break
		}
	}
	return grpc.Address{Addr: addr}, func() {}, nil
}

func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }

func (b *simpleBalancer) Close() error {
	b.mu.Lock()
	defer b.mu.Unlock()
	// In case gRPC calls close twice. TODO: remove the checking
	// when we are sure that gRPC wont call close twice.
	if b.closed {
		return nil
	}
	b.closed = true
	close(b.notifyCh)
	// terminate all waiting Get()s
	b.pinAddr = ""
	if len(b.upEps) == 0 {
		close(b.upc)
	}
	return nil
}

func getHost(ep string) string {
	url, uerr := url.Parse(ep)
	if uerr != nil || !strings.Contains(ep, "://") {
		return ep
	}
	return url.Host
}
back to top