Revision 1db176151b79ff72072668c0d4a041be222996a6 authored by Yicheng Qin on 14 July 2015, 23:19:11 UTC, committed by Yicheng Qin on 14 July 2015, 23:33:28 UTC
The discovery service doesn't return HTTP header early when watch
starts. This may trigger ResponseHeaderTimeout and cause the watch
request failed.

The fix on discovery service may take some time. Remove the
ResponseHeaderTimeout first so it behaves as before.
1 parent 953a59d
Raw File
storage.go
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserver

import (
	"io"
	"os"
	"path"

	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
	"github.com/coreos/etcd/migrate"
	"github.com/coreos/etcd/pkg/pbutil"
	"github.com/coreos/etcd/pkg/types"
	"github.com/coreos/etcd/raft/raftpb"
	"github.com/coreos/etcd/snap"
	"github.com/coreos/etcd/version"
	"github.com/coreos/etcd/wal"
	"github.com/coreos/etcd/wal/walpb"
)

type Storage interface {
	// Save function saves ents and state to the underlying stable storage.
	// Save MUST block until st and ents are on stable storage.
	Save(st raftpb.HardState, ents []raftpb.Entry) error
	// SaveSnap function saves snapshot to the underlying stable storage.
	SaveSnap(snap raftpb.Snapshot) error
	// Close closes the Storage and performs finalization.
	Close() error
}

type storage struct {
	*wal.WAL
	*snap.Snapshotter
}

func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
	return &storage{w, s}
}

// SaveSnap saves the snapshot to disk and release the locked
// wal files since they will not be used.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
	walsnap := walpb.Snapshot{
		Index: snap.Metadata.Index,
		Term:  snap.Metadata.Term,
	}
	err := st.WAL.SaveSnapshot(walsnap)
	if err != nil {
		return err
	}
	err = st.Snapshotter.SaveSnap(snap)
	if err != nil {
		return err
	}
	err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
	if err != nil {
		return err
	}
	return nil
}

func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
	var (
		err       error
		wmetadata []byte
	)

	repaired := false
	for {
		if w, err = wal.Open(waldir, snap); err != nil {
			plog.Fatalf("open wal error: %v", err)
		}
		if wmetadata, st, ents, err = w.ReadAll(); err != nil {
			w.Close()
			// we can only repair ErrUnexpectedEOF and we never repair twice.
			if repaired || err != io.ErrUnexpectedEOF {
				plog.Fatalf("read wal error (%v) and cannot be repaired", err)
			}
			if !wal.Repair(waldir) {
				plog.Fatalf("WAL error (%v) cannot be repaired", err)
			} else {
				plog.Infof("repaired WAL error (%v)", err)
				repaired = true
			}
			continue
		}
		break
	}
	var metadata pb.Metadata
	pbutil.MustUnmarshal(&metadata, wmetadata)
	id = types.ID(metadata.NodeID)
	cid = types.ID(metadata.ClusterID)
	return
}

// upgradeWAL converts an older version of the etcdServer data to the newest version.
// It must ensure that, after upgrading, the most recent version is present.
func upgradeDataDir(baseDataDir string, name string, ver version.DataDirVersion) error {
	switch ver {
	case version.DataDir0_4:
		plog.Infof("converting v0.4 log to v2.0")
		err := migrate.Migrate4To2(baseDataDir, name)
		if err != nil {
			plog.Fatalf("failed to migrate data-dir (%v)", err)
			return err
		}
		fallthrough
	case version.DataDir2_0:
		err := makeMemberDir(baseDataDir)
		if err != nil {
			return err
		}
		fallthrough
	case version.DataDir2_0_1:
		fallthrough
	default:
	}
	return nil
}

func makeMemberDir(dir string) error {
	membdir := path.Join(dir, "member")
	_, err := os.Stat(membdir)
	switch {
	case err == nil:
		return nil
	case !os.IsNotExist(err):
		return err
	}
	if err := os.MkdirAll(membdir, 0700); err != nil {
		return err
	}
	names := []string{"snap", "wal"}
	for _, name := range names {
		if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
			return err
		}
	}
	return nil
}
back to top