https://github.com/SeelabFhdo/lemma
Tip revision: 2e9ccc882352116b253a7700b5ecf2c9316a5829 authored by Florian Rademacher on 24 March 2023, 08:03:13 UTC
Eclipse Launchers: Fix Docker image selection
Eclipse Launchers: Fix Docker image selection
Tip revision: 2e9ccc8
ServiceModelSourceValidator.kt
package de.fhdo.lemma.model_processing.code_generation.springcloud.kafka.validators
import com.google.common.base.Function
import de.fhdo.lemma.data.Type
import de.fhdo.lemma.model_processing.annotations.Before
import de.fhdo.lemma.model_processing.annotations.SourceModelValidator
import de.fhdo.lemma.model_processing.code_generation.springcloud.kafka.cqrsAlias
import de.fhdo.lemma.model_processing.code_generation.springcloud.kafka.domainEventsAlias
import de.fhdo.lemma.model_processing.utils.countInputParameters
import de.fhdo.lemma.model_processing.utils.countResultParameters
import de.fhdo.lemma.model_processing.utils.getAllServiceAspects
import de.fhdo.lemma.model_processing.utils.getServiceAspect
import de.fhdo.lemma.model_processing.utils.getEndpointAddresses
import de.fhdo.lemma.model_processing.utils.hasServiceAspect
import de.fhdo.lemma.model_processing.code_generation.springcloud.kafka.kafkaAlias
import de.fhdo.lemma.model_processing.languages.convertToAbsoluteFileUrisInPlace
import de.fhdo.lemma.model_processing.phases.validation.AbstractXtextModelValidator
import de.fhdo.lemma.model_processing.utils.getPropertyValue
import de.fhdo.lemma.model_processing.utils.hasInputParameters
import de.fhdo.lemma.model_processing.utils.hasResultParameters
import de.fhdo.lemma.model_processing.utils.isInputParameter
import de.fhdo.lemma.model_processing.utils.isResultParameter
import de.fhdo.lemma.service.ImportedServiceAspect
import de.fhdo.lemma.service.Interface
import de.fhdo.lemma.service.Microservice
import de.fhdo.lemma.service.Operation
import de.fhdo.lemma.service.ServiceModel
import de.fhdo.lemma.service.ServicePackage
import de.fhdo.lemma.technology.CommunicationType
import de.fhdo.lemma.typechecking.TypeChecker
import de.fhdo.lemma.typechecking.TypesNotCompatibleException
import de.fhdo.lemma.utils.LemmaUtils
import org.eclipse.emf.ecore.resource.Resource
import org.eclipse.xtext.validation.Check
import java.lang.IllegalArgumentException
/**
* Validator for service source models. We implement the validation as an Xtext source model validator to take advantage
* of LEMMA's Live Validation capabilities.
*
* @author [Florian Rademacher](mailto:florian.rademacher@fh-dortmund.de)
*/
@SourceModelValidator
@Suppress("unused")
internal class ServiceModelSourceValidator : AbstractXtextModelValidator() {
override fun getSupportedFileExtensions() = setOf("services")
/**
* Prepare import model paths before validation
*/
@Before
@Suppress("unused")
private fun prepareImportModelPaths(resource: Resource) {
val sm = resource.contents[0] as ServiceModel
sm.imports.convertToAbsoluteFileUrisInPlace(resource)
}
/**
* Check that a source [Microservice] with the Kafka technology has a valid Kafka bootstrap address specified via
* the BootstrapAddress aspect
*/
@Check
@Suppress("unused")
private fun checkBootstrapAddress(microservice: Microservice) {
// Check aspect existence
val kafkaAlias = microservice.kafkaAlias ?: return
val bootstrapAddressAspect = microservice.getServiceAspect(kafkaAlias, "BootstrapAddress")
if (bootstrapAddressAspect == null) {
error("Kafka microservice requires BootstrapAddress aspect", ServicePackage.Literals.MICROSERVICE__NAME)
return
}
}
/**
* Warn if a microservice has an Avro schema registry address, but does not comprise Avro participant operations
*/
@Check
@Suppress("unused")
private fun warnAvroRegistryAddress(microservice: Microservice) {
val kafkaAlias = microservice.kafkaAlias ?: return
if (!microservice.hasServiceAspect(kafkaAlias, "AvroRegistryAddress"))
return
val hasAvroParticipants = microservice.interfaces.map { it.operations }.flatten()
.any { it.hasServiceAspect(kafkaAlias, "AvroParticipant") }
if (!hasAvroParticipants)
warning("Microservice specifies Avro schema registry address, but does not comprise Avro-specific Kafka " +
"operation participants", microservice, ServicePackage.Literals.MICROSERVICE__NAME)
}
/**
* Check that a microservice with an Avro schema registry address has Avro participant operations
*/
@Check
@Suppress("unused")
private fun checkAvroRegistryAddress(operation: Operation) {
val microservice = operation.`interface`.microservice
val kafkaAlias = microservice.kafkaAlias ?: return
if (operation.hasServiceAspect(kafkaAlias, "AvroParticipant") &&
!microservice.hasServiceAspect(kafkaAlias, "AvroRegistryAddress"))
error("Microservice of this Avro-specific Kafka operation participant does not specify Avro schema " +
"registry address", operation, ServicePackage.Literals.OPERATION__NAME)
}
/**
* Check source [Operation] that represents a Kafka participant as indicated by the Participant technology aspect
*/
@Check
@Suppress("unused")
private fun checkKafkaParticipant(operation: Operation) {
val kafkaAlias = operation.`interface`.microservice.kafkaAlias ?: return
val participantAspects = operation.getAllServiceAspects(kafkaAlias, "Participant").toMutableList()
participantAspects.addAll(operation.getAllServiceAspects(kafkaAlias, "AvroParticipant"))
if (participantAspects.isEmpty())
return
// A Kafka participant must have at least one asynchronous parameter, and at most one input and result
// asynchronous parameter
val asynchronousInputParameterCount = operation.countInputParameters(CommunicationType.ASYNCHRONOUS)
val asynchronousResultParameterCount = operation.countResultParameters(CommunicationType.ASYNCHRONOUS)
if (asynchronousInputParameterCount == 0 && asynchronousResultParameterCount == 0)
error("A Kafka operation participant must specify at least one asynchronous parameter", operation,
ServicePackage.Literals.OPERATION__NAME)
else if (asynchronousInputParameterCount > 1)
error("A Kafka operation participant may specify at most one asynchronous input parameter", operation,
ServicePackage.Literals.OPERATION__NAME)
else if (asynchronousResultParameterCount > 1)
error("A Kafka operation participant may specify at most one asynchronous result parameter", operation,
ServicePackage.Literals.OPERATION__NAME)
// Check for duplicate topics
val topics = participantAspects.map { it.getPropertyValue("topic")!! }
val duplicateIndex = LemmaUtils.getDuplicateIndex(topics, Function<String, String> { it })
if (duplicateIndex > -1) {
val aspect = participantAspects[duplicateIndex]
val duplicateTopic = topics[duplicateIndex]
error("""Duplicate topic "$duplicateTopic" in Participant aspect""", aspect,
ServicePackage.Literals.IMPORTED_SERVICE_ASPECT__VALUES)
}
// A Kafka participant receiving asynchronous values via incoming asynchronous parameters must specify a Kafka
// consumer group
participantAspects.forEach {
if (asynchronousInputParameterCount > 0 && it.getPropertyValue("consumerGroup") == null)
error("A Kafka operation participant with incoming asynchronous parameters must specify a consumer " +
"group", it, ServicePackage.Literals.IMPORTED_SERVICE_ASPECT__IMPORTED_ASPECT)
}
}
/**
* Check error handling configuration of an [Operation] with the ErrorHandlingConfiguration aspect
*/
@Check
@Suppress("unused")
private fun checkErrorHandlingConfiguration(operation: Operation) {
val kafkaAlias = operation.`interface`.microservice.kafkaAlias ?: return
val errorHandlingConfigurationAspect = operation.getServiceAspect(kafkaAlias, "ErrorHandlingConfiguration")
?: return
// Retries upon error must not be negative
val retriesUponError = errorHandlingConfigurationAspect.getPropertyValue("retriesUponError")?.toLong()
if (retriesUponError != null && retriesUponError < 0)
error("Value of retriesUponError property must not be negative", errorHandlingConfigurationAspect,
ServicePackage.Literals.IMPORTED_SERVICE_ASPECT__IMPORTED_ASPECT)
// Warn if error handling was disabled by means of DomainEvents.Consumer aspect
val domainEventsAlias = operation.`interface`.microservice.domainEventsAlias
val disableErrorHandling = if (domainEventsAlias !== null)
operation.getServiceAspect(domainEventsAlias, "Consumer")
?.getPropertyValue("disableErrorHandling") == "true"
else
false
if (disableErrorHandling)
warning("ErrorHandlingConfiguration will not be effective, because error handling was disabled via the " +
"DomainEvents.Consumer aspect", errorHandlingConfigurationAspect,
ServicePackage.Literals.IMPORTED_SERVICE_ASPECT__IMPORTED_ASPECT)
// TODO For future Spring versions (retry interval)
/*val retryInterval = errorHandlingConfigurationAspect.getPropertyValue("retryInterval")?.toLong()
if (retryInterval != null && retryInterval < 0)
error("Value of retryInterval property must not be negative", errorHandlingConfigurationAspect,
ServicePackage.Literals.IMPORTED_SERVICE_ASPECT__IMPORTED_ASPECT)*/
}
/**
* Check that for all operations of the CQRS query side of a microservice, type-compatible sender methods for
* synchronization purposes exist in the command side of the microservice
*/
@Check
@Suppress("unused")
private fun checkCommandSideSenderExistence(microservice: Microservice) {
val kafkaAlias = microservice.kafkaAlias ?: return
val cqrsAlias = microservice.cqrsAlias ?: return
val querySideAspect = microservice.getServiceAspect(cqrsAlias, "QuerySide") ?: return
val querySideOperations = microservice.interfaces.map { it.operations }.flatten()
.filter { it.hasParticipantAspect(kafkaAlias) }
// Command side microservices must be required by query side microservices to be identifiable as a corresponding
// side
val correspondingKafkaEnabledMicroservices = microservice.requiredMicroservices
.filter { it.microservice.isCorrespondingSide(querySideAspect) && it.microservice.kafkaAlias != null }
.associateWith { it.microservice.kafkaAlias!! }
val commandSideOperations = mutableMapOf<Operation, String>()
correspondingKafkaEnabledMicroservices.forEach { (correspondingMicroservice, correspondingKafkaAlias) ->
val interfaces = correspondingMicroservice.microservice.interfaces
val correspondingSideOperations = interfaces.getCorrespondingSideOperations(querySideAspect)
correspondingSideOperations.filter { it.hasParticipantAspect(correspondingKafkaAlias) }.forEach {
commandSideOperations[it] = correspondingKafkaAlias
}
}
// Check if type-compatible command side operation exists for each query side operation
querySideOperations.forEach { querySideOperation ->
val existsCompatibleSender = commandSideOperations.any { (commandSideOperation, commandSideKafkaAlias) ->
when {
commandSideOperation.hasServiceAspect(commandSideKafkaAlias, "AvroParticipant") ->
querySideOperation.canReceiveParametersWithKafka(commandSideOperation, "AvroParticipant")
else ->
querySideOperation.canReceiveParametersWithKafka(commandSideOperation, "Participant")
}
}
if (!existsCompatibleSender)
warning("No compatible command side sender found for query side receiver", querySideOperation,
ServicePackage.Literals.OPERATION__NAME)
}
}
/**
* Check if this [Operation] is a Kafka participant aspect from the Kafka technology model with the given
* [kafkaAlias]
*/
private fun Operation.hasParticipantAspect(kafkaAlias: String)
= hasServiceAspect(kafkaAlias, "AvroParticipant") || hasServiceAspect(kafkaAlias, "Participant")
/**
* Get side operations from this list of [Interface] instances. Command side operations must exhibit asynchronous
* result parameters and query side operations must exhibit asynchronous input parameters. The side is represented
* by the given [sideAspectName].
*/
private fun List<Interface>.getSideOperations(sideAspectName: String) : List<Operation> {
val sideFilter: (Operation) -> Boolean = when(sideAspectName) {
"CommandSide" -> { { it.hasResultParameters(CommunicationType.ASYNCHRONOUS) } }
"QuerySide" -> { { it.hasInputParameters(CommunicationType.ASYNCHRONOUS) } }
else -> throw IllegalArgumentException("Unsupported side aspect $sideAspectName")
}
return map { it.operations }.flatten().filter(sideFilter)
}
/**
* Check if this [Microservice] is the corresponding side of the side identified by the [sideAspect]. For example,
* the corresponding side microservice for the QuerySide aspect would the microservice with the CommandSide aspect.
* Furthermore, for two sides to correspond the logicalService property of the QuerySide and CommandSide aspects
* must be equal.
*/
private fun Microservice.isCorrespondingSide(sideAspect: ImportedServiceAspect) : Boolean {
if (cqrsAlias == null)
return false
val correspondingAspectName = sideAspect.getCorrespondingSideAspectName()
val logicalService = sideAspect.getPropertyValue("logicalService")
val correspondingSideAspect = getServiceAspect(cqrsAlias!!, correspondingAspectName) ?: return false
val correspondingSideLogicalService = correspondingSideAspect.getPropertyValue("logicalService")
return logicalService == correspondingSideLogicalService
}
/**
* Get corresponding side aspect name for this [ImportedServiceAspect]
*/
private fun ImportedServiceAspect.getCorrespondingSideAspectName()
= when(importedAspect.name) {
"CommandSide" -> "QuerySide"
"QuerySide" -> "CommandSide"
else -> throw IllegalArgumentException("Unsupported side aspect ${importedAspect.name}")
}
/**
* Get all operations from this list of [Interface] instances that correspond to the side represented by the
* given [sideAspect]. If, for example, the [sideAspect] represents the QuerySide aspect, the returned operations
* will exhibit the CommandSide aspect.
*/
private fun List<Interface>.getCorrespondingSideOperations(sideAspect: ImportedServiceAspect)
= getSideOperations(sideAspect.getCorrespondingSideAspectName())
/**
* Check if this [Operation] can receive parameters from the given [senderOperation] by means of Kafka
*/
private fun Operation.canReceiveParametersWithKafka(senderOperation: Operation, aspectName: String) : Boolean {
// Kafka aliases are required for sender and receiver
val senderKafkaAlias = senderOperation.`interface`.microservice.kafkaAlias ?: return false
val receiverKafkaAlias = `interface`.microservice.kafkaAlias ?: return false
// Matching topics are required for sender and receiver
val senderTopics = senderOperation.getAllServiceAspects(senderKafkaAlias, aspectName)
.mapNotNull { it.getPropertyValue("topic") }
if (senderTopics.isEmpty())
return false
val receiverTopics = getAllServiceAspects(receiverKafkaAlias, aspectName)
.filter { it.getPropertyValue("consumerGroup") != null }
.mapNotNull { it.getPropertyValue("topic") }
if (receiverTopics.isEmpty())
return false
val existMatchingTopics = receiverTopics.any { it in senderTopics }
if (!existMatchingTopics)
return false
// Result parameters of sender and input parameters of receiver must be partially type-compatible
val senderParameters = senderOperation.parameters.filter {
it.isResultParameter(CommunicationType.ASYNCHRONOUS)
}
val receiverParameters = parameters.filter { it.isInputParameter(CommunicationType.ASYNCHRONOUS) }
return receiverParameters.any { receiverParam ->
senderParameters.any { senderParam ->
WrappedTypeChecker.typesCompatible(receiverParam.effectiveType, senderParam.effectiveType)
}
}
}
}
/**
* Helper Singleton that wraps LEMMA's type checker and provides convenience methods to access it.
*
* @author [Florian Rademacher](mailto:florian.rademacher@fh-dortmund.de)
*/
private object WrappedTypeChecker {
val typeChecker = TypeChecker()
/**
* Check if the given [receiverType] is type-compatible with the given [providerType]. That is, the [receiverType]
* may receive type instances of the [providerType].
*/
fun typesCompatible(receiverType: Type, providerType: Type)
= try {
typeChecker.checkTypeCompatibility(receiverType, providerType)
true
} catch (ex: TypesNotCompatibleException) {
false
}
}