https://github.com/google/cayley
Tip revision: b0c2d1d891ffef8410d77958f1a59f5897ef539d authored by Mikael Cabot on 15 September 2016, 20:34:29 UTC
cleaning
cleaning
Tip revision: b0c2d1d
quadstore.go
// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gaedatastore
import (
"encoding/hex"
"errors"
"math"
"net/http"
"github.com/cayleygraph/cayley/clog"
"golang.org/x/net/context"
"google.golang.org/appengine"
"google.golang.org/appengine/datastore"
"github.com/cayleygraph/cayley/graph"
"github.com/cayleygraph/cayley/graph/iterator"
"github.com/cayleygraph/cayley/quad"
)
const (
QuadStoreType = "gaedatastore"
quadKind = "quad"
nodeKind = "node"
)
var (
// Order of quad fields
spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label}
)
type QuadStore struct {
context context.Context
}
type MetadataEntry struct {
NodeCount int64
QuadCount int64
}
type Token struct {
Kind string
Hash string
}
func (t Token) IsNode() bool { return t.Kind == nodeKind }
type QuadEntry struct {
Hash string
Added []string `datastore:",noindex"`
Deleted []string `datastore:",noindex"`
Subject string `datastore:"subject"`
Predicate string `datastore:"predicate"`
Object string `datastore:"object"`
Label string `datastore:"label"`
}
type NodeEntry struct {
Name string
Size int64
}
type LogEntry struct {
LogID string
Action string
Key string
Timestamp int64
}
func init() {
graph.RegisterQuadStore("gaedatastore", graph.QuadStoreRegistration{
NewFunc: newQuadStore,
NewForRequestFunc: newQuadStoreForRequest,
UpgradeFunc: nil,
InitFunc: initQuadStore,
IsPersistent: true,
})
}
func initQuadStore(_ string, _ graph.Options) error {
// TODO (panamafrancis) check appengine datastore for consistency
return nil
}
func newQuadStore(_ string, options graph.Options) (graph.QuadStore, error) {
var qs QuadStore
return &qs, nil
}
func newQuadStoreForRequest(qs graph.QuadStore, options graph.Options) (graph.QuadStore, error) {
newQs, err := newQuadStore("", options)
if err != nil {
return nil, err
}
t := newQs.(*QuadStore)
t.context, err = getContext(options)
return newQs, err
}
func (qs *QuadStore) createKeyForQuad(q quad.Quad) *datastore.Key {
id := hashOf(q.Subject)
id += hashOf(q.Predicate)
id += hashOf(q.Object)
id += hashOf(q.Label)
return qs.createKeyFromToken(&Token{quadKind, id})
}
func hashOf(s quad.Value) string {
return hex.EncodeToString(quad.HashOf(s))
}
func (qs *QuadStore) createKeyForNode(n quad.Value) *datastore.Key {
id := hashOf(n)
return qs.createKeyFromToken(&Token{nodeKind, id})
}
func (qs *QuadStore) createKeyForMetadata() *datastore.Key {
return qs.createKeyFromToken(&Token{"metadata", "metadataentry"})
}
func (qs *QuadStore) createKeyForLog(deltaID graph.PrimaryKey) *datastore.Key {
return datastore.NewKey(qs.context, "logentry", deltaID.String(), 0, nil)
}
func (qs *QuadStore) createKeyFromToken(t *Token) *datastore.Key {
return datastore.NewKey(qs.context, t.Kind, t.Hash, 0, nil)
}
func (qs *QuadStore) checkValid(k *datastore.Key) (bool, error) {
var q quad.Quad
err := datastore.Get(qs.context, k, &q)
if err == datastore.ErrNoSuchEntity {
return false, nil
}
if _, ok := err.(*datastore.ErrFieldMismatch); ok {
return true, nil
}
if err != nil {
clog.Warningf("Error occured when getting quad/node %s %v", k, err)
return false, err
}
return true, nil
}
func getContext(opts graph.Options) (context.Context, error) {
req := opts["HTTPRequest"].(*http.Request)
if req == nil {
err := errors.New("HTTP Request needed")
clog.Errorf("%v", err)
return nil, err
}
return appengine.NewContext(req), nil
}
func (qs *QuadStore) ApplyDeltas(in []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
if qs.context == nil {
return errors.New("No context, graph not correctly initialised")
}
toKeep := make([]graph.Delta, 0)
for _, d := range in {
if d.Action != graph.Add && d.Action != graph.Delete {
//Defensive shortcut
return errors.New("Datastore: invalid action")
}
key := qs.createKeyForQuad(d.Quad)
keep := false
switch d.Action {
case graph.Add:
found, err := qs.checkValid(key)
if err != nil {
return err
}
if found {
if !ignoreOpts.IgnoreDup {
return graph.ErrQuadExists
}
} else {
keep = true
}
case graph.Delete:
found, err := qs.checkValid(key)
if err != nil {
return err
}
if found || ignoreOpts.IgnoreMissing {
keep = true
} else {
return graph.ErrQuadNotExist
}
default:
keep = false
}
if keep {
toKeep = append(toKeep, d)
}
}
if len(toKeep) == 0 {
return nil
}
err := qs.updateLog(toKeep)
if err != nil {
clog.Errorf("Updating log failed %v", err)
return err
}
if clog.V(2) {
clog.Infof("Existence verified. Proceeding.")
}
quadsAdded, err := qs.updateQuads(toKeep)
if err != nil {
clog.Errorf("UpdateQuads failed %v", err)
return err
}
nodesAdded, err := qs.updateNodes(toKeep)
if err != nil {
clog.Warningf("UpdateNodes failed %v", err)
return err
}
err = qs.updateMetadata(quadsAdded, nodesAdded)
if err != nil {
clog.Warningf("UpdateMetadata failed %v", err)
return err
}
return nil
}
func (qs *QuadStore) updateNodes(in []graph.Delta) (int64, error) {
// Collate changes to each node
var countDelta int64
var nodesAdded int64
nodeDeltas := make(map[quad.Value]int64)
for _, d := range in {
if d.Action == graph.Add {
countDelta = 1
} else {
countDelta = -1
}
nodeDeltas[d.Quad.Subject] += countDelta
nodeDeltas[d.Quad.Object] += countDelta
nodeDeltas[d.Quad.Predicate] += countDelta
if d.Quad.Label != nil {
nodeDeltas[d.Quad.Label] += countDelta
}
nodesAdded += countDelta
}
// Create keys and new nodes
keys := make([]*datastore.Key, 0, len(nodeDeltas))
tempNodes := make([]NodeEntry, 0, len(nodeDeltas))
for k, v := range nodeDeltas {
keys = append(keys, qs.createKeyForNode(k))
tempNodes = append(tempNodes, NodeEntry{k.String(), v})
}
// In accordance with the appengine datastore spec, cross group transactions
// like these can only be done in batches of 5
for i := 0; i < len(nodeDeltas); i += 5 {
j := int(math.Min(float64(len(nodeDeltas)-i), 5))
foundNodes := make([]NodeEntry, j)
err := datastore.RunInTransaction(qs.context, func(c context.Context) error {
err := datastore.GetMulti(c, keys[i:i+j], foundNodes)
// Sift through for errors
if me, ok := err.(appengine.MultiError); ok {
for _, merr := range me {
if merr != nil && merr != datastore.ErrNoSuchEntity {
clog.Errorf("Error: %v", merr)
return merr
}
}
}
// Carry forward the sizes of the nodes from the datastore
for k, _ := range foundNodes {
if foundNodes[k].Name != "" {
tempNodes[i+k].Size += foundNodes[k].Size
}
}
_, err = datastore.PutMulti(c, keys[i:i+j], tempNodes[i:i+j])
return err
}, &datastore.TransactionOptions{XG: true})
if err != nil {
clog.Errorf("Error: %v", err)
return 0, err
}
}
return nodesAdded, nil
}
func (qs *QuadStore) updateQuads(in []graph.Delta) (int64, error) {
keys := make([]*datastore.Key, 0, len(in))
for _, d := range in {
keys = append(keys, qs.createKeyForQuad(d.Quad))
}
var quadCount int64
for i := 0; i < len(in); i += 5 {
// Find the closest batch of 5
j := int(math.Min(float64(len(in)-i), 5))
err := datastore.RunInTransaction(qs.context, func(c context.Context) error {
foundQuads := make([]QuadEntry, j)
// We don't process errors from GetMulti as they don't mean anything,
// we've handled existing quad conflicts above and we overwrite everything again anyways
datastore.GetMulti(c, keys, foundQuads)
for k, _ := range foundQuads {
x := i + k
foundQuads[k].Hash = keys[x].StringID()
foundQuads[k].Subject = in[x].Quad.Subject.String()
foundQuads[k].Predicate = in[x].Quad.Predicate.String()
foundQuads[k].Object = in[x].Quad.Object.String()
foundQuads[k].Label = quad.StringOf(in[x].Quad.Label)
// If the quad exists the Added[] will be non-empty
if in[x].Action == graph.Add {
foundQuads[k].Added = append(foundQuads[k].Added, in[x].ID.String())
quadCount += 1
} else {
foundQuads[k].Deleted = append(foundQuads[k].Deleted, in[x].ID.String())
quadCount -= 1
}
}
_, err := datastore.PutMulti(c, keys[i:i+j], foundQuads)
return err
}, &datastore.TransactionOptions{XG: true})
if err != nil {
return 0, err
}
}
return quadCount, nil
}
func (qs *QuadStore) updateMetadata(quadsAdded int64, nodesAdded int64) error {
key := qs.createKeyForMetadata()
foundMetadata := new(MetadataEntry)
err := datastore.RunInTransaction(qs.context, func(c context.Context) error {
err := datastore.Get(c, key, foundMetadata)
if err != nil && err != datastore.ErrNoSuchEntity {
clog.Errorf("Error: %v", err)
return err
}
foundMetadata.QuadCount += quadsAdded
foundMetadata.NodeCount += nodesAdded
_, err = datastore.Put(c, key, foundMetadata)
if err != nil {
clog.Errorf("Error: %v", err)
}
return err
}, nil)
return err
}
func (qs *QuadStore) updateLog(in []graph.Delta) error {
if qs.context == nil {
err := errors.New("Error updating log, context is nil, graph not correctly initialised")
return err
}
if len(in) == 0 {
return errors.New("Nothing to log")
}
logEntries := make([]LogEntry, 0, len(in))
logKeys := make([]*datastore.Key, 0, len(in))
for _, d := range in {
var action string
if d.Action == graph.Add {
action = "Add"
} else {
action = "Delete"
}
entry := LogEntry{
LogID: d.ID.String(),
Action: action,
Key: qs.createKeyForQuad(d.Quad).String(),
Timestamp: d.Timestamp.UnixNano(),
}
logEntries = append(logEntries, entry)
logKeys = append(logKeys, qs.createKeyForLog(d.ID))
}
_, err := datastore.PutMulti(qs.context, logKeys, logEntries)
if err != nil {
clog.Errorf("Error updating log: %v", err)
}
return err
}
func (qs *QuadStore) QuadIterator(dir quad.Direction, v graph.Value) graph.Iterator {
return NewIterator(qs, quadKind, dir, v)
}
func (qs *QuadStore) NodesAllIterator() graph.Iterator {
return NewAllIterator(qs, nodeKind)
}
func (qs *QuadStore) QuadsAllIterator() graph.Iterator {
return NewAllIterator(qs, quadKind)
}
func (qs *QuadStore) ValueOf(s quad.Value) graph.Value {
id := hashOf(s)
return &Token{Kind: nodeKind, Hash: id}
}
func (qs *QuadStore) NameOf(val graph.Value) quad.Value {
if qs.context == nil {
clog.Errorf("Error in NameOf, context is nil, graph not correctly initialised")
return nil
}
var key *datastore.Key
if t, ok := val.(*Token); ok && t.Kind == nodeKind {
key = qs.createKeyFromToken(t)
} else {
clog.Errorf("Token not valid")
return nil
}
// TODO (panamafrancis) implement a cache
node := new(NodeEntry)
err := datastore.Get(qs.context, key, node)
if err != nil {
clog.Errorf("Error: %v", err)
return nil
}
return quad.Raw(node.Name)
}
func (qs *QuadStore) Quad(val graph.Value) quad.Quad {
if qs.context == nil {
clog.Errorf("Error fetching quad, context is nil, graph not correctly initialised")
return quad.Quad{}
}
var key *datastore.Key
if t, ok := val.(*Token); ok && t.Kind == quadKind {
key = qs.createKeyFromToken(t)
} else {
clog.Errorf("Token not valid")
return quad.Quad{}
}
q := new(QuadEntry)
err := datastore.Get(qs.context, key, q)
if err != nil {
// Red herring error : ErrFieldMismatch can happen when a quad exists but a field is empty
if _, ok := err.(*datastore.ErrFieldMismatch); !ok {
clog.Errorf("Error: %v", err)
}
}
return quad.MakeRaw(
q.Subject,
q.Predicate,
q.Object,
q.Label,
)
}
func (qs *QuadStore) Size() int64 {
if qs.context == nil {
clog.Errorf("Error fetching size, context is nil, graph not correctly initialised")
return 0
}
key := qs.createKeyForMetadata()
foundMetadata := new(MetadataEntry)
err := datastore.Get(qs.context, key, foundMetadata)
if err != nil {
clog.Warningf("Error: %v", err)
return 0
}
return foundMetadata.QuadCount
}
func (qs *QuadStore) NodeSize() int64 {
if qs.context == nil {
clog.Errorf("Error fetching node size, context is nil, graph not correctly initialised")
return 0
}
key := qs.createKeyForMetadata()
foundMetadata := new(MetadataEntry)
err := datastore.Get(qs.context, key, foundMetadata)
if err != nil {
clog.Warningf("Error: %v", err)
return 0
}
return foundMetadata.NodeCount
}
func (qs *QuadStore) Horizon() graph.PrimaryKey {
if qs.context == nil {
clog.Warningf("Warning: HTTP Request context is nil, cannot get horizon from datastore.")
return graph.NewUniqueKey("")
}
// Query log for last entry...
q := datastore.NewQuery("logentry").Order("-Timestamp").Limit(1)
var logEntries []LogEntry
_, err := q.GetAll(qs.context, &logEntries)
if err != nil || len(logEntries) == 0 {
// Error fetching horizon, probably graph is empty
return graph.NewUniqueKey("")
}
return graph.NewUniqueKey(logEntries[0].LogID)
}
func compareTokens(a, b graph.Value) bool {
atok := a.(*Token)
btok := b.(*Token)
return atok.Kind == btok.Kind && atok.Hash == btok.Hash
}
func (qs *QuadStore) FixedIterator() graph.FixedIterator {
return iterator.NewFixed(compareTokens)
}
func (qs *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) {
return nil, false
}
func (qs *QuadStore) Close() {
qs.context = nil
}
func (qs *QuadStore) QuadDirection(val graph.Value, dir quad.Direction) graph.Value {
t, ok := val.(*Token)
if !ok {
clog.Errorf("Token not valid")
return nil
}
if t.Kind == nodeKind {
clog.Errorf("Node tokens not valid")
return nil
}
var offset int
switch dir {
case quad.Subject:
offset = 0
case quad.Predicate:
offset = (quad.HashSize * 2)
case quad.Object:
offset = (quad.HashSize * 2) * 2
case quad.Label:
offset = (quad.HashSize * 2) * 3
}
sub := t.Hash[offset : offset+(quad.HashSize*2)]
return &Token{Kind: nodeKind, Hash: sub}
}
func (qs *QuadStore) Type() string {
return QuadStoreType
}