https://github.com/cilium/cilium
Raw File
Tip revision: a76227a7e37ef587cb4cf9af52ac4a5f43753557 authored by Martynas Pumputis on 24 May 2023, 09:20:59 UTC
WIP
Tip revision: a76227a
observable.go
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

// The stream package provides utilities for working with observable streams.
// Any type that implements the Observable interface can be transformed and
// consumed with these utilities.
package stream

import "context"

// Observable defines the Observe method for observing a stream of values.
type Observable[T any] interface {
	// Observe a stream of values as long as the given context is valid.
	// 'next' is called for each item, and finally 'complete' is called
	// when the stream is complete, or an error has occurred.
	//
	// Observable implementations are allowed to call 'next' and 'complete'
	// from any goroutine, but never concurrently.
	Observe(ctx context.Context, next func(T), complete func(error))
}

// FuncObservable implements the Observable interface with a function.
type FuncObservable[T any] func(context.Context, func(T), func(error))

func (f FuncObservable[T]) Observe(ctx context.Context, next func(T), complete func(error)) {
	f(ctx, next, complete)
}
back to top