https://github.com/google/cayley
Raw File
Tip revision: 03ab36624ad88600895d6dcb9c4249192ac14a6e authored by admin on 15 February 2019, 06:52:25 UTC
kv: get badger to work
Tip revision: 03ab366
pquads.go
// Package pquads implements Cayley-specific protobuf-based quads format.
package pquads

import (
	"bytes"
	"encoding/binary"
	"fmt"
	"io"

	"github.com/gogo/protobuf/proto"

	"github.com/cayleygraph/cayley/quad"
	"github.com/cayleygraph/cayley/quad/pquads/pio"
)

var DefaultMaxSize = 1024 * 1024

const currentVersion = 1

var magic = [4]byte{0, 'p', 'q', 0}

const ContentType = "application/x-protobuf"

func init() {
	quad.RegisterFormat(quad.Format{
		Name: "pquads", Binary: true,
		Ext:            []string{".pq"},
		Mime:           []string{ContentType, "application/octet-stream"},
		Writer:         func(w io.Writer) quad.WriteCloser { return NewWriter(w, nil) },
		Reader:         func(r io.Reader) quad.ReadCloser { return NewReader(r, DefaultMaxSize) },
		MarshalValue:   MarshalValue,
		UnmarshalValue: UnmarshalValue,
	})
}

type Writer struct {
	pw      pio.Writer
	max     int
	err     error
	opts    Options
	s, p, o quad.Value
	cl      io.Closer
}

type Options struct {
	// Full can be set to disable quad values compaction.
	//
	// This will increase files size, but skip will work faster by omitting unmarshal entirely.
	Full bool
	// Strict can be set to only marshal quads allowed by RDF spec.
	Strict bool
}

// NewWriter creates protobuf quads encoder.
func NewWriter(w io.Writer, opts *Options) *Writer {
	// Write file magic and version
	buf := make([]byte, 8)
	copy(buf[:4], magic[:])
	binary.LittleEndian.PutUint32(buf[4:], currentVersion)
	if _, err := w.Write(buf); err != nil {
		return &Writer{err: err}
	}
	pw := pio.NewWriter(w)
	if opts == nil {
		opts = &Options{}
	}
	// Write options header
	_, err := pw.WriteMsg(&Header{
		Full:      opts.Full,
		NotStrict: !opts.Strict,
	})
	return &Writer{pw: pw, err: err, opts: *opts}
}
func (w *Writer) WriteQuad(q quad.Quad) error {
	if w.err != nil {
		return w.err
	}
	if !w.opts.Full {
		if q.Subject == w.s {
			q.Subject = nil
		} else {
			w.s = q.Subject
		}
		if q.Predicate == w.p {
			q.Predicate = nil
		} else {
			w.p = q.Predicate
		}
		if q.Object == w.o {
			q.Object = nil
		} else {
			w.o = q.Object
		}
	}
	var m proto.Message
	if w.opts.Strict {
		m, w.err = makeStrictQuad(q)
		if w.err != nil {
			return w.err
		}
	} else {
		m = makeWireQuad(q)
	}
	var n int
	n, w.err = w.pw.WriteMsg(m)
	if n > w.max {
		w.max = n
	}
	return w.err
}

// MaxSize returns a maximal message size written.
func (w *Writer) MaxSize() int {
	return w.max
}
func (w *Writer) SetCloser(c io.Closer) {
	w.cl = c
}
func (w *Writer) Close() error {
	if w.cl != nil {
		return w.cl.Close()
	}
	return nil
}

type Reader struct {
	pr      pio.Reader
	err     error
	opts    Options
	s, p, o quad.Value
	cl      io.Closer
}

func (r *Reader) SetCloser(c io.Closer) {
	r.cl = c
}

var _ quad.Skipper = (*Reader)(nil)

// NewReader creates protobuf quads decoder.
//
// MaxSize argument limits maximal size of the buffer used to read quads.
func NewReader(r io.Reader, maxSize int) *Reader {
	if maxSize <= 0 {
		maxSize = DefaultMaxSize
	}
	qr := &Reader{}
	buf := make([]byte, 8)
	if _, err := io.ReadFull(r, buf); err != nil {
		qr.err = err
		return qr
	} else if bytes.Compare(magic[:], buf[:4]) != 0 {
		qr.err = fmt.Errorf("not a pquads file")
		return qr
	}
	vers := binary.LittleEndian.Uint32(buf[4:])
	if vers != currentVersion {
		qr.err = fmt.Errorf("unsupported pquads version: %d", vers)
		return qr
	}

	qr.pr = pio.NewReader(r, maxSize)
	var h Header
	if err := qr.pr.ReadMsg(&h); err != nil {
		qr.err = err
	}
	qr.opts = Options{
		Full:   h.Full,
		Strict: !h.NotStrict,
	}
	return qr
}
func (r *Reader) ReadQuad() (quad.Quad, error) {
	if r.err != nil {
		return quad.Quad{}, r.err
	}
	var q quad.Quad
	if r.opts.Strict {
		var pq StrictQuad
		if r.err = r.pr.ReadMsg(&pq); r.err != nil {
			return quad.Quad{}, r.err
		}
		q = pq.ToNative()
	} else {
		var pq WireQuad
		if r.err = r.pr.ReadMsg(&pq); r.err != nil {
			return quad.Quad{}, r.err
		}
		q = pq.ToNative()
	}
	if q.Subject == nil {
		q.Subject = r.s
	} else {
		r.s = q.Subject
	}
	if q.Predicate == nil {
		q.Predicate = r.p
	} else {
		r.p = q.Predicate
	}
	if q.Object == nil {
		q.Object = r.o
	} else {
		r.o = q.Object
	}
	return q, nil
}
func (r *Reader) SkipQuad() error {
	if !r.opts.Full {
		// TODO(dennwc): read pb fields as bytes and unmarshal them only if ReadQuad is called
		_, err := r.ReadQuad()
		return err
	}
	r.err = r.pr.SkipMsg()
	return r.err
}
func (r *Reader) Close() error {
	if r.cl != nil {
		return r.cl.Close()
	}
	return nil
}
back to top