https://github.com/google/cayley
Tip revision: 8c66cea771069d40ecbe82559ef1e5f9a0d3604c authored by Misha Brukman on 25 September 2022, 03:56:38 UTC
Fix URL pointing to Gremlin docs
Fix URL pointing to Gremlin docs
Tip revision: 8c66cea
quadstore.go
// Copyright 2017 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"os"
"sync"
"github.com/cayleygraph/cayley/graph"
"github.com/cayleygraph/cayley/graph/proto"
"github.com/cayleygraph/cayley/graph/refs"
"github.com/cayleygraph/cayley/internal/lru"
"github.com/cayleygraph/cayley/query/shape"
"github.com/cayleygraph/quad"
"github.com/cayleygraph/quad/pquads"
"github.com/hidal-go/hidalgo/kv"
boom "github.com/tylertreat/BoomFilters"
)
var (
ErrNoBucket = errors.New("kv: no bucket")
ErrEmptyPath = errors.New("kv: path to the database must be specified")
)
type Registration struct {
NewFunc NewFunc
InitFunc InitFunc
IsPersistent bool
}
type InitFunc func(string, graph.Options) (kv.KV, error)
type NewFunc func(string, graph.Options) (kv.KV, error)
func Register(name string, r Registration) {
graph.RegisterQuadStore(name, graph.QuadStoreRegistration{
InitFunc: func(addr string, opt graph.Options) error {
if !r.IsPersistent {
return nil
}
kv, err := r.InitFunc(addr, opt)
if err != nil {
return err
}
defer kv.Close()
if err = Init(kv, opt); err != nil {
return err
}
return kv.Close()
},
NewFunc: func(addr string, opt graph.Options) (graph.QuadStore, error) {
kv, err := r.NewFunc(addr, opt)
if err != nil {
return nil, err
}
if !r.IsPersistent {
if err = Init(kv, opt); err != nil {
kv.Close()
return nil, err
}
}
return New(kv, opt)
},
IsPersistent: r.IsPersistent,
})
}
const (
latestDataVersion = 2
envKVDefaultIndexes = "CAYLEY_KV_INDEXES"
)
var (
_ refs.BatchNamer = (*QuadStore)(nil)
_ shape.Optimizer = (*QuadStore)(nil)
)
type QuadStore struct {
db kv.KV
indexes struct {
sync.RWMutex
all []QuadIndex
// indexes used to detect duplicate quads
exists []QuadIndex
}
valueLRU *lru.Cache
writer sync.Mutex
mapBucket map[string]map[string][]uint64
mapBloom map[string]*boom.BloomFilter
mapNodes *boom.BloomFilter
exists struct {
disabled bool
sync.Mutex
buf []byte
*boom.DeletableBloomFilter
}
}
func newQuadStore(kv kv.KV) *QuadStore {
return &QuadStore{db: kv}
}
func Init(kv kv.KV, opt graph.Options) error {
ctx := context.TODO()
qs := newQuadStore(kv)
if data := os.Getenv(envKVDefaultIndexes); data != "" {
qs.indexes.all = nil
if err := json.Unmarshal([]byte(data), &qs.indexes); err != nil {
return err
}
}
if qs.indexes.all == nil {
qs.indexes.all = DefaultQuadIndexes
}
if _, err := qs.getMetadata(ctx); err == nil {
return graph.ErrDatabaseExists
} else if err != ErrNoBucket {
return err
}
upfront, err := opt.BoolKey("upfront", false)
if err != nil {
return err
}
if err := qs.createBuckets(ctx, upfront); err != nil {
return err
}
if err := setVersion(ctx, qs.db, latestDataVersion); err != nil {
return err
}
if err := qs.writeIndexesMeta(ctx); err != nil {
return err
}
return nil
}
const (
OptNoBloom = "no_bloom"
)
func New(kv kv.KV, opt graph.Options) (graph.QuadStore, error) {
ctx := context.TODO()
qs := newQuadStore(kv)
if vers, err := qs.getMetadata(ctx); err == ErrNoBucket {
return nil, graph.ErrNotInitialized
} else if err != nil {
return nil, err
} else if vers != latestDataVersion {
return nil, errors.New("kv: data version is out of date. Run cayleyupgrade for your config to update the data")
}
list, err := qs.readIndexesMeta(ctx)
if err != nil {
return nil, err
}
qs.indexes.all = list
qs.valueLRU = lru.New(2000)
qs.exists.disabled, _ = opt.BoolKey(OptNoBloom, false)
if err := qs.initBloomFilter(ctx); err != nil {
return nil, err
}
if !qs.exists.disabled {
if sz, err := qs.getSize(); err != nil {
return nil, err
} else if sz == 0 {
qs.mapBloom = make(map[string]*boom.BloomFilter)
qs.mapNodes = boom.NewBloomFilter(100*1000*1000, 0.05)
}
}
return qs, nil
}
func setVersion(ctx context.Context, db kv.KV, version int64) error {
return kv.Update(ctx, db, func(tx kv.Tx) error {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], uint64(version))
if err := tx.Put(metaBucket.AppendBytes([]byte("version")), buf[:]); err != nil {
return fmt.Errorf("couldn't write version: %v", err)
}
return nil
})
}
func (qs *QuadStore) getMetaInt(ctx context.Context, key string) (int64, error) {
var v int64
err := kv.View(qs.db, func(tx kv.Tx) error {
val, err := tx.Get(ctx, metaBucket.AppendBytes([]byte(key)))
if err == kv.ErrNotFound {
return ErrNoBucket
} else if err != nil {
return err
}
v, err = asInt64(val, 0)
if err != nil {
return err
}
return nil
})
return v, err
}
func (qs *QuadStore) getSize() (int64, error) {
sz, err := qs.getMetaInt(context.TODO(), "size")
if err == ErrNoBucket {
return 0, nil
}
return sz, err
}
func (qs *QuadStore) Size() int64 {
sz, _ := qs.getSize()
return sz
}
func (qs *QuadStore) Stats(ctx context.Context, exact bool) (graph.Stats, error) {
sz, err := qs.getMetaInt(ctx, "size")
if err != nil {
return graph.Stats{}, err
}
st := graph.Stats{
Nodes: refs.Size{
Value: sz / 3,
Exact: false, // TODO(dennwc): store nodes count
},
Quads: refs.Size{
Value: sz,
Exact: true,
},
}
if exact {
// calculate the exact number of nodes
st.Nodes.Value = 0
it := qs.NodesAllIterator().Iterate()
defer it.Close()
for it.Next(ctx) {
st.Nodes.Value++
}
if err := it.Err(); err != nil {
return st, err
}
st.Nodes.Exact = true
}
return st, nil
}
func (qs *QuadStore) Close() error {
return qs.db.Close()
}
func (qs *QuadStore) getMetadata(ctx context.Context) (int64, error) {
var vers int64
err := kv.View(qs.db, func(tx kv.Tx) error {
val, err := tx.Get(ctx, metaBucket.AppendBytes([]byte("version")))
if err == kv.ErrNotFound {
return ErrNoBucket
} else if err != nil {
return err
}
vers, err = asInt64(val, 0)
if err != nil {
return err
}
return nil
})
return vers, err
}
func asInt64(b []byte, empty int64) (int64, error) {
if len(b) == 0 {
return empty, nil
} else if len(b) != 8 {
return 0, fmt.Errorf("unexpected int size: %d", len(b))
}
v := int64(binary.LittleEndian.Uint64(b))
return v, nil
}
func (qs *QuadStore) horizon(ctx context.Context) int64 {
h, _ := qs.getMetaInt(ctx, "horizon")
return h
}
func (qs *QuadStore) ValuesOf(ctx context.Context, vals []graph.Ref) ([]quad.Value, error) {
out := make([]quad.Value, len(vals))
var (
inds []int
irefs []uint64
)
for i, v := range vals {
if v == nil {
continue
} else if pv, ok := v.(refs.PreFetchedValue); ok {
out[i] = pv.NameOf()
continue
}
switch v := v.(type) {
case Int64Value:
if v == 0 {
continue
}
inds = append(inds, i)
irefs = append(irefs, uint64(v))
default:
return out, fmt.Errorf("unknown type of graph.Ref; not meant for this quadstore. apparently a %#v", v)
}
}
if len(irefs) == 0 {
return out, nil
}
prim, err := qs.getPrimitives(ctx, irefs)
if err != nil {
return out, err
}
var last error
for i, p := range prim {
if p == nil || !p.IsNode() {
continue
}
qv, err := pquads.UnmarshalValue(p.Value)
if err != nil {
last = err
continue
}
out[inds[i]] = qv
}
return out, last
}
func (qs *QuadStore) RefsOf(ctx context.Context, nodes []quad.Value) ([]graph.Ref, error) {
values := make([]graph.Ref, len(nodes))
err := kv.View(qs.db, func(tx kv.Tx) error {
for i, node := range nodes {
value, err := qs.resolveQuadValue(ctx, tx, node)
if err != nil {
return err
}
values[i] = Int64Value(value)
}
return nil
})
if err != nil {
return nil, err
}
return values, nil
}
func (qs *QuadStore) NameOf(v graph.Ref) (quad.Value, error) {
ctx := context.TODO()
vals, err := qs.ValuesOf(ctx, []graph.Ref{v})
if err != nil {
return nil, fmt.Errorf("error getting NameOf %d: %w", v, err)
}
return vals[0], nil
}
func (qs *QuadStore) Quad(k graph.Ref) (quad.Quad, error) {
key, ok := k.(*proto.Primitive)
if !ok {
return quad.Quad{}, fmt.Errorf("passed value was not a quad primitive: %T", k)
}
ctx := context.TODO()
var v quad.Quad
err := kv.View(qs.db, func(tx kv.Tx) error {
var err error
v, err = qs.primitiveToQuad(ctx, tx, key)
return err
})
if err == kv.ErrNotFound {
err = nil
}
if err != nil {
err = fmt.Errorf("error fetching quad %#v: %w", key, err)
}
return v, err
}
func (qs *QuadStore) primitiveToQuad(ctx context.Context, tx kv.Tx, p *proto.Primitive) (quad.Quad, error) {
q := &quad.Quad{}
for _, dir := range quad.Directions {
v := p.GetDirection(dir)
val, err := qs.getValFromLog(ctx, tx, v)
if err != nil {
return *q, err
}
q.Set(dir, val)
}
return *q, nil
}
func (qs *QuadStore) getValFromLog(ctx context.Context, tx kv.Tx, k uint64) (quad.Value, error) {
if k == 0 {
return nil, nil
}
p, err := qs.getPrimitiveFromLog(ctx, tx, k)
if err != nil {
return nil, err
}
return pquads.UnmarshalValue(p.Value)
}
func (qs *QuadStore) ValueOf(s quad.Value) (graph.Ref, error) {
ctx := context.TODO()
var out Int64Value
err := kv.View(qs.db, func(tx kv.Tx) error {
v, err := qs.resolveQuadValue(ctx, tx, s)
out = Int64Value(v)
return err
})
if err != nil {
return nil, err
}
if out == 0 {
return nil, nil
}
return out, nil
}
func (qs *QuadStore) QuadDirection(val graph.Ref, d quad.Direction) (graph.Ref, error) {
p, ok := val.(*proto.Primitive)
if !ok {
return nil, nil
}
switch d {
case quad.Subject:
return Int64Value(p.Subject), nil
case quad.Predicate:
return Int64Value(p.Predicate), nil
case quad.Object:
return Int64Value(p.Object), nil
case quad.Label:
if p.Label == 0 {
return nil, nil
}
return Int64Value(p.Label), nil
}
return nil, nil
}
func (qs *QuadStore) getPrimitives(ctx context.Context, vals []uint64) ([]*proto.Primitive, error) {
tx, err := qs.db.Tx(false)
if err != nil {
return nil, err
}
defer tx.Close()
tx = wrapTx(tx)
return qs.getPrimitivesFromLog(ctx, tx, vals)
}
type Int64Value uint64
func (v Int64Value) Key() interface{} { return v }