https://github.com/google/cayley
Raw File
Tip revision: 07fd9fac18f610e565add58beafa6ed9b7d98e26 authored by Iddan Aaronsohn on 12 November 2019, 00:41:50 UTC
Only execlude cayley on top level in gitignore
Tip revision: 07fd9fa
api_v2.go
// Copyright 2017 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cayleyhttp

import (
	"compress/gzip"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"
	"strings"
	"time"

	"github.com/julienschmidt/httprouter"

	"github.com/cayleygraph/cayley/clog"
	"github.com/cayleygraph/cayley/graph"
	"github.com/cayleygraph/cayley/query"
	"github.com/cayleygraph/cayley/query/shape"
	_ "github.com/cayleygraph/cayley/writer"
	"github.com/cayleygraph/quad"
)

const prefix = "/api/v2"

func NewAPIv2(h *graph.Handle) *APIv2 {
	return NewAPIv2Writer(h, "single", nil)
}

func NewAPIv2Writer(h *graph.Handle, wtype string, wopts graph.Options) *APIv2 {
	return &APIv2{h: h, wtyp: wtype, wopt: wopts, limit: 100}
}

type APIv2 struct {
	h     *graph.Handle
	ro    bool
	batch int

	// replication
	wtyp string
	wopt graph.Options

	// query
	timeout time.Duration
	limit   int
}

func (api *APIv2) SetReadOnly(ro bool) {
	api.ro = ro
}
func (api *APIv2) SetBatchSize(n int) {
	api.batch = n
}
func (api *APIv2) SetQueryTimeout(dt time.Duration) {
	api.timeout = dt
}
func (api *APIv2) SetQueryLimit(n int) {
	api.limit = n
}

func (api *APIv2) RegisterDataOn(r *httprouter.Router) {
	if !api.ro {
		r.POST(prefix+"/write", api.ServeWrite)
		r.POST(prefix+"/delete", api.ServeDelete)
		r.POST(prefix+"/node/delete", api.ServeNodeDelete)
	}
	r.POST(prefix+"/read", api.ServeRead)
	r.GET(prefix+"/read", api.ServeRead)
	r.GET(prefix+"/formats", api.ServeFormats)
}

func (api *APIv2) RegisterQueryOn(r *httprouter.Router) {
	r.POST(prefix+"/query", api.ServeQuery)
	r.GET(prefix+"/query", api.ServeQuery)
}

func (api *APIv2) RegisterOn(r *httprouter.Router) {
	api.RegisterDataOn(r)
	api.RegisterQueryOn(r)
}

const (
	defaultFormat      = "nquads"
	hdrContentType     = "Content-Type"
	hdrContentEncoding = "Content-Encoding"
	hdrAccept          = "Accept"
	hdrAcceptEncoding  = "Accept-Encoding"
	contentTypeJSON    = "application/json"
	contentTypeJSONLD  = "application/ld+json"
)

func getFormat(r *http.Request, formKey string, acceptName string) *quad.Format {
	var format *quad.Format
	if formKey != "" {
		if name := r.FormValue("format"); name != "" {
			format = quad.FormatByName(name)
		}
	}
	if acceptName != "" && format == nil {
		specs := ParseAccept(r.Header, acceptName)
		// TODO: sort by Q
		if len(specs) != 0 {
			format = quad.FormatByMime(specs[0].Value)
		}
	}
	if format == nil {
		format = quad.FormatByName(defaultFormat)
	}
	return format
}

func readerFrom(r *http.Request, acceptName string) (io.ReadCloser, error) {
	if specs := ParseAccept(r.Header, acceptName); len(specs) != 0 {
		if s := specs[0]; s.Value == "gzip" {
			zr, err := gzip.NewReader(r.Body)
			if err != nil {
				return nil, err
			}
			return zr, nil
		}
	}
	return r.Body, nil
}

type nopWriteCloser struct {
	io.Writer
}

func (nopWriteCloser) Close() error { return nil }

func writerFrom(w http.ResponseWriter, r *http.Request, acceptName string) io.WriteCloser {
	if specs := ParseAccept(r.Header, acceptName); len(specs) != 0 {
		if s := specs[0]; s.Value == "gzip" {
			w.Header().Set(hdrContentEncoding, s.Value)
			zw := gzip.NewWriter(w)
			return zw
		}
	}
	return nopWriteCloser{Writer: w}
}

func (api *APIv2) handleForRequest(r *http.Request) (*graph.Handle, error) {
	return HandleForRequest(api.h, api.wtyp, api.wopt, r)
}

func (api *APIv2) ServeWrite(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
	defer r.Body.Close()
	if api.ro {
		jsonResponse(w, http.StatusForbidden, errors.New("database is read-only"))
		return
	}
	format := getFormat(r, "", hdrContentType)
	if format == nil || format.Reader == nil {
		jsonResponse(w, http.StatusBadRequest, errors.New("format is not supported for reading data"))
		return
	}
	rd, err := readerFrom(r, hdrContentEncoding)
	if err != nil {
		jsonResponse(w, http.StatusBadRequest, err)
		return
	}
	defer rd.Close()
	qr := format.Reader(rd)
	defer qr.Close()
	h, err := api.handleForRequest(r)
	if err != nil {
		jsonResponse(w, http.StatusBadRequest, err)
		return
	}
	qw := graph.NewWriter(h.QuadWriter)
	defer qw.Close()
	n, err := quad.CopyBatch(qw, qr, api.batch)
	if err != nil {
		jsonResponse(w, http.StatusInternalServerError, err)
		return
	}
	err = qw.Close()
	if err != nil {
		jsonResponse(w, http.StatusInternalServerError, err)
		return
	}
	w.Header().Set(hdrContentType, contentTypeJSON)
	fmt.Fprintf(w, `{"result": "Successfully wrote %d quads.", "count": %d}`+"\n", n, n)
}

func (api *APIv2) ServeDelete(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
	defer r.Body.Close()
	if api.ro {
		jsonResponse(w, http.StatusForbidden, errors.New("database is read-only"))
		return
	}
	format := getFormat(r, "", hdrContentType)
	if format == nil || format.Reader == nil {
		jsonResponse(w, http.StatusBadRequest, fmt.Errorf("format is not supported for reading quads"))
		return
	}
	rd, err := readerFrom(r, hdrContentEncoding)
	if err != nil {
		jsonResponse(w, http.StatusBadRequest, err)
		return
	}
	defer rd.Close()
	qr := format.Reader(r.Body)
	defer qr.Close()
	h, err := api.handleForRequest(r)
	if err != nil {
		jsonResponse(w, http.StatusBadRequest, err)
		return
	}
	qw := graph.NewRemover(h.QuadWriter)
	defer qw.Close()
	n, err := quad.CopyBatch(qw, qr, api.batch)
	if err != nil {
		jsonResponse(w, http.StatusInternalServerError, err)
		return
	}
	w.Header().Set(hdrContentType, contentTypeJSON)
	fmt.Fprintf(w, `{"result": "Successfully deleted %d quads.", "count": %d}`+"\n", n, n)
}

func (api *APIv2) ServeNodeDelete(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
	defer r.Body.Close()
	if api.ro {
		jsonResponse(w, http.StatusForbidden, errors.New("database is read-only"))
		return
	}
	format := getFormat(r, "", hdrContentType)
	if format == nil || format.UnmarshalValue == nil {
		jsonResponse(w, http.StatusBadRequest, fmt.Errorf("format is not supported for reading nodes"))
		return
	}
	const limit = 128*1024 + 1
	rd := io.LimitReader(r.Body, limit)
	data, err := ioutil.ReadAll(rd)
	if err != nil {
		jsonResponse(w, http.StatusBadRequest, err)
		return
	} else if len(data) == limit {
		jsonResponse(w, http.StatusBadRequest, fmt.Errorf("request data is too large"))
		return
	}
	v, err := format.UnmarshalValue(data)
	if err != nil {
		jsonResponse(w, http.StatusBadRequest, err)
		return
	} else if v == nil {
		jsonResponse(w, http.StatusBadRequest, fmt.Errorf("cannot remove nil value"))
		return
	}
	h, err := api.handleForRequest(r)
	if err != nil {
		jsonResponse(w, http.StatusBadRequest, err)
		return
	}
	err = h.RemoveNode(v)
	if err != nil {
		jsonResponse(w, http.StatusInternalServerError, err)
		return
	}
	w.Header().Set(hdrContentType, contentTypeJSON)
	const n = 1
	fmt.Fprintf(w, `{"result": "Successfully deleted %d nodes.", "count": %d}`+"\n", n, n)
}

type checkWriter struct {
	w       io.Writer
	written bool
}

func (w *checkWriter) Write(p []byte) (int, error) {
	w.written = true
	return w.w.Write(p)
}

func valuesFromString(s string) []quad.Value {
	if s == "" {
		return nil
	}
	arr := strings.Split(s, ",")
	out := make([]quad.Value, 0, len(arr))
	for _, s := range arr {
		out = append(out, quad.StringToValue(s))
	}
	return out
}

func (api *APIv2) ServeRead(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
	format := getFormat(r, "format", hdrAccept)
	if format == nil || format.Writer == nil {
		jsonResponse(w, http.StatusBadRequest, fmt.Errorf("format is not supported for reading data"))
		return
	}
	h, err := api.handleForRequest(r)
	if err != nil {
		jsonResponse(w, http.StatusBadRequest, err)
		return
	}
	values := shape.FilterQuads(
		valuesFromString(r.FormValue("sub")),
		valuesFromString(r.FormValue("pred")),
		valuesFromString(r.FormValue("obj")),
		valuesFromString(r.FormValue("label")),
	)
	it := values.BuildIterator(h.QuadStore).Iterate()
	qr := graph.NewResultReader(h.QuadStore, it)

	defer qr.Close()

	wr := writerFrom(w, r, hdrAcceptEncoding)
	defer wr.Close()

	cw := &checkWriter{w: wr}
	qwc := format.Writer(cw)
	defer qwc.Close()
	var qw quad.Writer = qwc
	if len(format.Mime) != 0 {
		w.Header().Set(hdrContentType, format.Mime[0])
	}
	if irif := r.FormValue("iri"); irif != "" {
		opts := quad.IRIOptions{
			Format: quad.IRIDefault,
		}
		switch irif {
		case "short":
			opts.Format = quad.IRIShort
		case "full":
			opts.Format = quad.IRIFull
		}
		qw = quad.IRIWriter(qw, opts)
	}
	if bw, ok := qw.(quad.BatchWriter); ok {
		_, err = quad.CopyBatch(bw, qr, api.batch)
	} else {
		_, err = quad.Copy(qw, qr)
	}
	if err != nil && !cw.written {
		jsonResponse(w, http.StatusInternalServerError, err)
		return
	} else if err != nil {
		// can do nothing here, since first byte (and header) was written
		// TODO: check if client just gone away
		clog.Errorf("read quads error: %v", err)
	}
}

func (api *APIv2) ServeFormats(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
	type Format struct {
		Id     string   `json:"id"`
		Read   bool     `json:"read,omitempty"`
		Write  bool     `json:"write,omitempty"`
		Nodes  bool     `json:"nodes,omitempty"`
		Ext    []string `json:"ext,omitempty"`
		Mime   []string `json:"mime,omitempty"`
		Binary bool     `json:"binary,omitempty"`
	}
	formats := quad.Formats()
	out := make([]Format, 0, len(formats))
	for _, f := range formats {
		out = append(out, Format{
			Id:  f.Name,
			Ext: f.Ext, Mime: f.Mime,
			Read: f.Reader != nil, Write: f.Writer != nil,
			Nodes:  f.UnmarshalValue != nil,
			Binary: f.Binary,
		})
	}
	w.Header().Set(hdrContentType, contentTypeJSON)
	json.NewEncoder(w).Encode(out)
}

func (api *APIv2) queryContext(r *http.Request) (ctx context.Context, cancel func()) {
	ctx = r.Context()
	if api.timeout > 0 {
		ctx, cancel = context.WithTimeout(ctx, api.timeout)
	} else {
		ctx, cancel = context.WithCancel(ctx)
	}
	return ctx, cancel
}

func defaultErrorFunc(w query.ResponseWriter, err error) {
	data, _ := json.Marshal(err.Error())
	w.WriteHeader(http.StatusBadRequest)
	w.Write([]byte(`{"error": `))
	w.Write(data)
	w.Write([]byte("}\n"))
}

func writeResults(w io.Writer, r interface{}) {
	enc := json.NewEncoder(w)
	enc.SetEscapeHTML(false)
	enc.Encode(map[string]interface{}{
		"result": r,
	})
}

const maxQuerySize = 1024 * 1024 // 1 MB
func readLimit(r io.Reader) ([]byte, error) {
	lr := io.LimitReader(r, maxQuerySize).(*io.LimitedReader)
	data, err := ioutil.ReadAll(lr)
	if err != nil && lr.N <= 0 {
		err = errors.New("request is too large")
	}
	return data, err
}

func (api *APIv2) ServeQuery(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
	ctx, cancel := api.queryContext(r)
	defer cancel()
	vals := r.URL.Query()
	lang := vals.Get("lang")
	if lang == "" {
		jsonResponse(w, http.StatusBadRequest, "query language not specified")
		return
	}
	l := query.GetLanguage(lang)
	if l == nil {
		jsonResponse(w, http.StatusBadRequest, "unknown query language")
		return
	}
	errFunc := defaultErrorFunc
	if l.HTTPError != nil {
		errFunc = l.HTTPError
	}
	select {
	case <-ctx.Done():
		errFunc(w, ctx.Err())
		return
	default:
	}
	h, err := api.handleForRequest(r)
	if err != nil {
		errFunc(w, err)
		return
	}
	if l.HTTPQuery != nil {
		defer r.Body.Close()
		l.HTTPQuery(ctx, h.QuadStore, w, r.Body)
		return
	}
	if l.Session == nil {
		errFunc(w, errors.New("HTTP interface is not supported for this query language"))
		return
	}
	ses := l.Session(h.QuadStore)
	var qu string
	if r.Method == "GET" {
		qu = vals.Get("qu")
	} else {
		data, err := readLimit(r.Body)
		if err != nil {
			errFunc(w, err)
			return
		}
		qu = string(data)
	}
	if qu == "" {
		jsonResponse(w, http.StatusBadRequest, "query is empty")
		return
	}
	if clog.V(1) {
		clog.Infof("query: %s: %q", lang, qu)
	}

	opt := query.Options{
		Collation: query.JSON, // TODO: switch to JSON-LD by default when the time comes
		Limit:     api.limit,
	}
	if specs := ParseAccept(r.Header, hdrAccept); len(specs) != 0 {
		// TODO: sort by Q
		switch specs[0].Value {
		case contentTypeJSON:
			opt.Collation = query.JSON
		case contentTypeJSONLD:
			opt.Collation = query.JSONLD
		}
	}
	it, err := ses.Execute(ctx, qu, opt)
	if err != nil {
		errFunc(w, err)
		return
	}
	defer it.Close()

	var out []interface{}
	for it.Next(ctx) {
		out = append(out, it.Result())
	}
	if err = it.Err(); err != nil {
		errFunc(w, err)
		return
	}
	if opt.Collation == query.JSONLD {
		w.Header().Set(hdrContentType, contentTypeJSONLD)
	} else {
		w.Header().Set(hdrContentType, contentTypeJSON)
	}
	writeResults(w, out)
}
back to top