https://github.com/google/cayley
Raw File
Tip revision: a04deabbecb2d3cc4142f6bda632dd3e07fb6a86 authored by Luis Fung on 14 February 2019, 20:20:11 UTC
Merge branch 'master' of https://github.com/fungl164/cayley
Tip revision: a04deab
quadwriter.go
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package graph

// Defines the interface for consistent replication of a graph instance.
//
// Separate from the backend, this dictates how individual quads get
// identified and replicated consistently across (potentially) multiple
// instances. The simplest case is to keep an append-only log of quad
// changes.

import (
	"context"
	"errors"
	"io"

	"github.com/cayleygraph/cayley/quad"
)

type Procedure int8

func (p Procedure) String() string {
	switch p {
	case +1:
		return "add"
	case -1:
		return "delete"
	default:
		return "invalid"
	}
}

// The different types of actions a transaction can do.
const (
	Add    Procedure = +1
	Delete Procedure = -1
)

type Delta struct {
	Quad   quad.Quad
	Action Procedure
}

// Unwrap returns an original QuadStore value if it was wrapped by Handle.
// This prevents shadowing of optional interface implementations.
func Unwrap(qs QuadStore) QuadStore {
	if h, ok := qs.(*Handle); ok {
		return h.QuadStore
	}
	return qs
}

type Handle struct {
	QuadStore
	QuadWriter
}

type IgnoreOpts struct {
	IgnoreDup, IgnoreMissing bool
}

func (h *Handle) Close() error {
	err := h.QuadWriter.Close()
	h.QuadStore.Close()
	return err
}

var (
	ErrQuadExists    = errors.New("quad exists")
	ErrQuadNotExist  = errors.New("quad does not exist")
	ErrInvalidAction = errors.New("invalid action")
	ErrNodeNotExists = errors.New("node does not exist")
)

// DeltaError records an error and the delta that caused it.
type DeltaError struct {
	Delta Delta
	Err   error
}

func (e *DeltaError) Error() string {
	if !e.Delta.Quad.IsValid() {
		return e.Err.Error()
	}
	return e.Delta.Action.String() + " " + e.Delta.Quad.String() + ": " + e.Err.Error()
}

// IsQuadExist returns whether an error is a DeltaError
// with the Err field equal to ErrQuadExists.
func IsQuadExist(err error) bool {
	if err == ErrQuadExists {
		return true
	}
	de, ok := err.(*DeltaError)
	return ok && de.Err == ErrQuadExists
}

// IsQuadNotExist returns whether an error is a DeltaError
// with the Err field equal to ErrQuadNotExist.
func IsQuadNotExist(err error) bool {
	if err == ErrQuadNotExist {
		return true
	}
	de, ok := err.(*DeltaError)
	return ok && de.Err == ErrQuadNotExist
}

// IsInvalidAction returns whether an error is a DeltaError
// with the Err field equal to ErrInvalidAction.
func IsInvalidAction(err error) bool {
	if err == ErrInvalidAction {
		return true
	}
	de, ok := err.(*DeltaError)
	return ok && de.Err == ErrInvalidAction
}

var (
	// IgnoreDuplicates specifies whether duplicate quads
	// cause an error during loading or are ignored.
	IgnoreDuplicates = true

	// IgnoreMissing specifies whether missing quads
	// cause an error during deletion or are ignored.
	IgnoreMissing = false
)

type QuadWriter interface {
	// AddQuad adds a quad to the store.
	AddQuad(quad.Quad) error

	// TODO(barakmich): Deprecate in favor of transaction.
	// AddQuadSet adds a set of quads to the store, atomically if possible.
	AddQuadSet([]quad.Quad) error

	// RemoveQuad removes a quad matching the given one  from the database,
	// if it exists. Does nothing otherwise.
	RemoveQuad(quad.Quad) error

	// ApplyTransaction applies a set of quad changes.
	ApplyTransaction(*Transaction) error

	// RemoveNode removes all quads which have the given node as subject, predicate, object, or label.
	//
	// It returns ErrNodeNotExists if node is missing.
	RemoveNode(quad.Value) error

	// Close cleans up replication and closes the writing aspect of the database.
	Close() error
}

type NewQuadWriterFunc func(QuadStore, Options) (QuadWriter, error)

var writerRegistry = make(map[string]NewQuadWriterFunc)

func RegisterWriter(name string, newFunc NewQuadWriterFunc) {
	if _, found := writerRegistry[name]; found {
		panic("already registered QuadWriter " + name)
	}
	writerRegistry[name] = newFunc
}

func NewQuadWriter(name string, qs QuadStore, opts Options) (QuadWriter, error) {
	newFunc, hasNew := writerRegistry[name]
	if !hasNew {
		return nil, errors.New("replication: name '" + name + "' is not registered")
	}
	return newFunc(qs, opts)
}

func WriterMethods() []string {
	t := make([]string, 0, len(writerRegistry))
	for n := range writerRegistry {
		t = append(t, n)
	}
	return t
}

type BatchWriter interface {
	quad.WriteCloser
	quad.BatchWriter
	Flush() error
}

// NewWriter creates a quad writer for a given QuadStore.
//
// Caller must call Flush or Close to flush an internal buffer.
func NewWriter(qs QuadWriter) BatchWriter {
	return &batchWriter{qs: qs}
}

type batchWriter struct {
	qs  QuadWriter
	buf []quad.Quad
}

func (w *batchWriter) flushBuffer(force bool) error {
	if !force && len(w.buf) < quad.DefaultBatch {
		return nil
	}
	_, err := w.WriteQuads(w.buf)
	w.buf = w.buf[:0]
	return err
}

func (w *batchWriter) WriteQuad(q quad.Quad) error {
	if err := w.flushBuffer(false); err != nil {
		return err
	}
	w.buf = append(w.buf, q)
	return nil
}
func (w *batchWriter) WriteQuads(quads []quad.Quad) (int, error) {
	if err := w.qs.AddQuadSet(quads); err != nil {
		return 0, err
	}
	return len(quads), nil
}
func (w *batchWriter) Flush() error {
	return w.flushBuffer(true)
}
func (w *batchWriter) Close() error {
	return w.Flush()
}

// NewTxWriter creates a writer that applies a given procedures for all quads in stream.
// If procedure is zero, Add operation will be used.
func NewTxWriter(tx *Transaction, p Procedure) quad.Writer {
	if p == 0 {
		p = Add
	}
	return &txWriter{tx: tx, p: p}
}

type txWriter struct {
	tx *Transaction
	p  Procedure
}

func (w *txWriter) WriteQuad(q quad.Quad) error {
	switch w.p {
	case Add:
		w.tx.AddQuad(q)
	case Delete:
		w.tx.RemoveQuad(q)
	default:
		return ErrInvalidAction
	}
	return nil
}

// NewRemover creates a quad writer for a given QuadStore which removes quads instead of adding them.
func NewRemover(qs QuadWriter) BatchWriter {
	return &removeWriter{qs: qs}
}

type removeWriter struct {
	qs QuadWriter
}

func (w *removeWriter) WriteQuad(q quad.Quad) error {
	return w.qs.RemoveQuad(q)
}
func (w *removeWriter) WriteQuads(quads []quad.Quad) (int, error) {
	tx := NewTransaction()
	for _, q := range quads {
		tx.RemoveQuad(q)
	}
	if err := w.qs.ApplyTransaction(tx); err != nil {
		return 0, err
	}
	return len(quads), nil
}
func (w *removeWriter) Flush() error {
	return nil // TODO: batch deletes automatically
}
func (w *removeWriter) Close() error { return nil }

// NewResultReader creates a quad reader for a given QuadStore.
func NewQuadStoreReader(qs QuadStore) quad.ReadSkipCloser {
	return NewResultReader(qs, nil)
}

// NewResultReader creates a quad reader for a given QuadStore and iterator.
// If iterator is nil QuadsAllIterator will be used.
//
// Only quads returned by iterator's Result will be used.
//
// Iterator will be closed with the reader.
func NewResultReader(qs QuadStore, it Iterator) quad.ReadSkipCloser {
	if it == nil {
		it = qs.QuadsAllIterator()
	}
	return &quadReader{qs: qs, it: it}
}

type quadReader struct {
	qs QuadStore
	it Iterator
}

func (r *quadReader) ReadQuad() (quad.Quad, error) {
	if r.it.Next(context.TODO()) {
		return r.qs.Quad(r.it.Result()), nil
	}
	err := r.it.Err()
	if err == nil {
		err = io.EOF
	}
	return quad.Quad{}, err
}
func (r *quadReader) SkipQuad() error {
	if r.it.Next(context.TODO()) {
		return nil
	}
	if err := r.it.Err(); err != nil {
		return err
	}
	return io.EOF
}
func (r *quadReader) Close() error { return r.it.Close() }
back to top