Revision 9831d981fc90760e810066f2f7d67bd2dc9e6b1c authored by Daniel Borkmann on 18 November 2020, 15:46:58 UTC, committed by Daniel Borkmann on 20 November 2020, 10:28:57 UTC
Example invocation:

  ./daemon/cilium-agent --enable-ipv4=true --enable-ipv6=true \
      --datapath-mode=lb-only --bpf-lb-algorithm=maglev       \
      --bpf-lb-maglev-table-size=65521 --bpf-lb-mode=dsr      \
      --bpf-lb-acceleration=native --bpf-lb-dsr-dispatch=ipip \
      --devices=enp2s0np0

Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
1 parent 03f14fe
Raw File
request.go
// Copyright 2017 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
	"bytes"
	"encoding/binary"
	"encoding/json"
	"fmt"
	"io"

	"github.com/cilium/cilium/pkg/flowdebug"

	"github.com/optiopay/kafka/proto"
	log "github.com/sirupsen/logrus"
)

// RequestMessage represents a Kafka request message
type RequestMessage struct {
	kind     int16
	version  int16
	clientID string
	rawMsg   []byte
	request  interface{}
	// Maintain a map of all topics in the request.  We should
	// allow the request only if all topics in the request are
	// allowed by the rules.
	topics map[string]struct{}
}

// CorrelationID represents the correlation id as defined in the Kafka protocol
// specification
type CorrelationID uint32

// GetAPIKey returns the kind of Kafka request
func (req *RequestMessage) GetAPIKey() int16 {
	return req.kind
}

// GetRaw returns the raw Kafka request
func (req *RequestMessage) GetRaw() []byte {
	return req.rawMsg
}

// GetVersion returns the version Kafka request
func (req *RequestMessage) GetVersion() int16 {
	return req.version
}

// GetCorrelationID returns the Kafka request correlationID
func (req *RequestMessage) GetCorrelationID() CorrelationID {
	if len(req.rawMsg) >= 12 {
		return CorrelationID(binary.BigEndian.Uint32(req.rawMsg[8:12]))
	}

	return CorrelationID(0)
}

// SetCorrelationID modified the correlation ID of the Kafka request
func (req *RequestMessage) SetCorrelationID(id CorrelationID) {
	if len(req.rawMsg) >= 12 {
		binary.BigEndian.PutUint32(req.rawMsg[8:12], uint32(id))
	}
}

func (req *RequestMessage) extractVersion() int16 {
	return int16(binary.BigEndian.Uint16(req.rawMsg[6:8]))
}

func (req *RequestMessage) extractClientID() string {
	if req.version == 0 || len(req.rawMsg) < 14 {
		return "" // 0 version has no client ID
	}
	// ref. https://kafka.apache.org/protocol#protocol_details
	length := int16(binary.BigEndian.Uint16(req.rawMsg[12:14]))
	if length <= 0 || len(req.rawMsg) < 14+int(length) {
		return ""
	}
	return string(req.rawMsg[14 : 14+int(length)])
}

// String returns a human readable representation of the request message
func (req *RequestMessage) String() string {
	b, err := json.Marshal(req.request)
	if err != nil {
		return err.Error()
	}

	return fmt.Sprintf("apiKey=%d,apiVersion=%d,len=%d: %s",
		req.kind, req.version, len(req.rawMsg), string(b))
}

// GetTopics returns the Kafka request list of topics
func (req *RequestMessage) GetTopics() []string {
	if req.request == nil {
		return nil
	}
	topics := make([]string, 0, len(req.topics))
	for topic := range req.topics {
		topics = append(topics, topic)
	}
	return topics
}

func (req *RequestMessage) setTopics() {
	var topics []string
	switch val := req.request.(type) {
	case *proto.ProduceReq:
		topics = produceTopics(val)
	case *proto.FetchReq:
		topics = fetchTopics(val)
	case *proto.OffsetReq:
		topics = offsetTopics(val)
	case *proto.MetadataReq:
		topics = metadataTopics(val)
	case *proto.OffsetCommitReq:
		topics = offsetCommitTopics(val)
	case *proto.OffsetFetchReq:
		topics = offsetFetchTopics(val)
	}
	req.topics = make(map[string]struct{}, len(topics))
	for _, topic := range topics {
		req.topics[topic] = struct{}{}
	}
}

func produceTopics(req *proto.ProduceReq) []string {
	topics := make([]string, len(req.Topics))
	for k, topic := range req.Topics {
		topics[k] = topic.Name
	}
	return topics
}

func fetchTopics(req *proto.FetchReq) []string {
	topics := make([]string, len(req.Topics))
	for k, topic := range req.Topics {
		topics[k] = topic.Name
	}
	return topics
}

func offsetTopics(req *proto.OffsetReq) []string {
	topics := make([]string, len(req.Topics))
	for k, topic := range req.Topics {
		topics[k] = topic.Name
	}
	return topics
}

func metadataTopics(req *proto.MetadataReq) []string {
	topics := req.Topics
	return topics
}

func offsetCommitTopics(req *proto.OffsetCommitReq) []string {
	topics := make([]string, len(req.Topics))
	for k, topic := range req.Topics {
		topics[k] = topic.Name
	}
	return topics
}

func offsetFetchTopics(req *proto.OffsetFetchReq) []string {
	topics := make([]string, len(req.Topics))
	for k, topic := range req.Topics {
		topics[k] = topic.Name
	}
	return topics
}

// CreateResponse creates a response message based on the provided request
// message. The response will have the specified error code set in all topics
// and embedded partitions.
func (req *RequestMessage) CreateResponse(err error) (*ResponseMessage, error) {
	switch val := req.request.(type) {
	case *proto.ProduceReq:
		return createProduceResponse(val, err)
	case *proto.FetchReq:
		return createFetchResponse(val, err)
	case *proto.OffsetReq:
		return createOffsetResponse(val, err)
	case *proto.MetadataReq:
		return createMetadataResponse(val, err)
	case *proto.ConsumerMetadataReq:
		return createConsumerMetadataResponse(val, err)
	case *proto.OffsetCommitReq:
		return createOffsetCommitResponse(val, err)
	case *proto.OffsetFetchReq:
		return createOffsetFetchResponse(val, err)
	case nil:
		return nil, fmt.Errorf("unsupported request API key %d", req.kind)
	default:
		// The switch cases above must correspond exactly to the switch cases
		// in ReadRequest.
		log.Panic(fmt.Sprintf("Kafka API key not handled: %d", req.kind))
	}
	return nil, nil
}

// CreateAuthErrorResponse creates Authorization error response message for 'req'
func (req *RequestMessage) CreateAuthErrorResponse() (*ResponseMessage, error) {
	return req.CreateResponse(proto.ErrTopicAuthorizationFailed)
}

// ReadRequest will read a Kafka request from an io.Reader and return the
// message or an error.
func ReadRequest(reader io.Reader) (*RequestMessage, error) {
	req := &RequestMessage{}
	var err error

	req.kind, req.rawMsg, err = proto.ReadReq(reader)
	if err != nil {
		return nil, err
	}

	if len(req.rawMsg) < 12 {
		return nil, fmt.Errorf("unexpected end of request (length < 12 bytes)")
	}
	req.version = req.extractVersion()
	req.clientID = req.extractClientID()

	var nilSlice []byte
	buf := bytes.NewBuffer(append(nilSlice, req.rawMsg...))

	switch req.kind {
	case proto.ProduceReqKind:
		req.request, err = proto.ReadProduceReq(buf)
	case proto.FetchReqKind:
		req.request, err = proto.ReadFetchReq(buf)
	case proto.OffsetReqKind:
		req.request, err = proto.ReadOffsetReq(buf)
	case proto.MetadataReqKind:
		req.request, err = proto.ReadMetadataReq(buf)
	case proto.ConsumerMetadataReqKind:
		req.request, err = proto.ReadConsumerMetadataReq(buf)
	case proto.OffsetCommitReqKind:
		req.request, err = proto.ReadOffsetCommitReq(buf)
	case proto.OffsetFetchReqKind:
		req.request, err = proto.ReadOffsetFetchReq(buf)
	default:
		if flowdebug.Enabled() {
			log.Debugf("Unknown Kafka request API key: %d in %s", req.kind, req.String())
		}
	}

	if err != nil {
		if flowdebug.Enabled() {
			log.WithError(err).Debugf("Ignoring Kafka message %s due to parse error", req.String())
		}
		return nil, err
	}

	req.setTopics()

	return req, nil
}
back to top