Skip to content

Commit

Permalink
Merge branch 'release/13.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
acaproni committed Apr 3, 2024
2 parents 8a407b2 + 36ce13a commit b031d07
Show file tree
Hide file tree
Showing 17 changed files with 424 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.eso.ias.asce.exceptions.{PropNotFoundException, WrongPropValue}
import org.eso.ias.asce.transfer.{IasIO, IasioInfo, ScalaTransferExecutor}
import org.eso.ias.logging.IASLogger
import org.eso.ias.types.IASTypes.*
import org.eso.ias.types.{Alarm, OperationalMode, Priority}
import org.eso.ias.types.{Alarm, OperationalMode, Priority, IasValidity}

import scala.util.control.NonFatal

Expand Down Expand Up @@ -124,20 +124,53 @@ extends ScalaTransferExecutor[Alarm](cEleId,cEleRunningId,validityTimeFrame,prop
// returns Map(k2 -> B,C, k1 -> A, k3 -> D)
grouped.view.mapValues(_.map(_._2).toList.mkString(",")).toMap
}


/**
* Set the validity constraint to apply to the TF
*
* The validity depnds on the validity of the inputs that contribute
* to the determination of the output
* @see https://github.com/IntegratedAlarmSystem-Group/ias/issues/201
*
* @param isSet True if the output is set, Flase if cleared
* @param setAlarms The IDs of the alarms in input that are SET
* @param clearedAlarms The IDs of the alarms in input that are SET
* @return the IDs of the alarm to use to set the validity of the output
*/
def buildValidityConstraint(
isSet: Boolean,
setAlarms: Iterable[IasIO[Alarm]],
clearedAlarms: Iterable[IasIO[Alarm]]): Option[Set[String]] = {

val validSetAlarms = setAlarms.filter(alarm => {alarm.validity==IasValidity.RELIABLE}).toSet
val validUnsetAlarms = clearedAlarms.filter(alarm => {alarm.validity==IasValidity.RELIABLE}).toSet

val totAlarmsSz = setAlarms.size+clearedAlarms.size

if (isSet && validSetAlarms.size>=threshold) Some(validSetAlarms.map(_.id))
else if (!isSet && validUnsetAlarms.size>totAlarmsSz-threshold) Some(validUnsetAlarms.map(_.id))
else None
}

/**
* @see ScalaTransferExecutor#eval
*/
def eval(compInputs: Map[String, IasIO[_]], actualOutput: IasIO[Alarm]): IasIO[Alarm] = {

// Get the active alarms in input
// Get the active (SET) alarms in input
val activeAlarms: Iterable[IasIO[Alarm]] = compInputs.values.filter(input =>{
input.value.isDefined && input.value.get.asInstanceOf[Alarm].isSet
input.value.isDefined && input.value.get.asInstanceOf[Alarm].isSet()
}).map(_.asInstanceOf[IasIO[Alarm]])

// Get the inactive (CLEARED) alarms in input
val inactiveAlarms: Iterable[IasIO[Alarm]] = compInputs.values.filter(input =>{
input.value.isDefined && input.value.get.asInstanceOf[Alarm].isCleared()
}).map(_.asInstanceOf[IasIO[Alarm]])

val actualAlarm = actualOutput.value.getOrElse(Alarm.getInitialAlarmState(priorityFromCDB))

val newAlarm: Alarm = actualAlarm.setIf (activeAlarms.size>=threshold)
val newAlarm: Alarm = actualAlarm.setIf(activeAlarms.size>=threshold)

// The properties of the output
val props = if (newAlarm.isSet) {
Expand All @@ -153,7 +186,9 @@ extends ScalaTransferExecutor[Alarm](cEleId,cEleRunningId,validityTimeFrame,prop
getOutputMode(compInputs.values.map(_.mode))
}

actualOutput.updateValue(newAlarm).updateMode(mode).updateProps(props)
val validityConstraints = buildValidityConstraint(newAlarm.isSet(), activeAlarms, inactiveAlarms)

actualOutput.updateValue(newAlarm).updateMode(mode).updateProps(props).setValidityConstraint(validityConstraints)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class TestMultiplicityTF extends AnyFlatSpec with BeforeAndAfterEach {
*
* @param asce: the ASCE to check the alarm state of the output
* @param alarmState: The expected alarm in output
* @return True if the alarm is SET, False otherwise
*/
def checkAlarmActivation(asce: ComputingElement[Alarm], isSet: Boolean): Boolean = {
assert(asce.isOutputAnAlarm,"The output is not an alarm")
Expand Down Expand Up @@ -142,6 +143,49 @@ class TestMultiplicityTF extends AnyFlatSpec with BeforeAndAfterEach {
ret
}

/**
* Check if in the set, the value with the passed id has the passed validity
*
* @param values The set of values from where to get the IASValue with the given ID
* @param id the ID of the IASValue to check
* @param validity the requested validity
* @return True iif the IASValue with the given id has a validity equal to validity
*/
def checkValidity(values: Set[IASValue[_]], id: String, validity: IasValidity): Boolean = {
val set: Set[IASValue[?]]=values.filter(_.id==id)
assert(set.size==1)
set.forall(_.iasValidity==validity)
}

/**
* Set the validity of the passed value.
* Being IASValue immutable, a new object is built if the validity
* to set differs from the validity of the passed IASValue
*
* @param iasValue the value whose validity must be changed
* @param validity the validity to set
* @return a IASValue with the validity set
*/
def setValidity(iasValue: IASValue[_], validity: IasValidity): IASValue[_] = {
if (iasValue.iasValidity==validity) return iasValue
val value = iasValue.asInstanceOf[IASValue[Alarm]]
return new IASValue[Alarm](
value.value,
value.mode,
validity,
value.fullRunningId,
value.valueType,
value.readFromMonSysTStamp,
value.productionTStamp,
value.sentToConverterTStamp,
value.receivedFromPluginTStamp,
value.convertedProductionTStamp,
value.sentToBsdbTStamp,
value.readFromBsdbTStamp,
value.dependentsFullRuningIds,
value.props);
}

behavior of "The scala MultiplicityTF executor with given priority"

it must "Correctly load and shutdown the multiplicity TF executor" in {
Expand Down Expand Up @@ -335,4 +379,134 @@ class TestMultiplicityTF extends AnyFlatSpec with BeforeAndAfterEach {
scalaCompWithNoPriority.get.update(clearedMPs)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, false))
}

// The following is to test the validity of the output when some of the inputs
// are valid and some invalid
behavior of "The validity constraints of the output of the scala MultiplicityTF"

it must "be not empty when enough SET inputs are reliable" in {
// Change all inputs to SET to trigger the TF
val changedMPs: Set[IASValue[?]] = inputsMPs.map ( iasio => iasio.updateValue(Some(Alarm.getInitialAlarmState.set())).updateProdTStamp(System.currentTimeMillis()).toIASValue())
// At the beginning all the inputs are UNRELIABLE
changedMPs.foreach( value => {assert(value.iasValidity==IasValidity.UNRELIABLE)})
scalaCompWithNoPriority.get.update(changedMPs)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, true))
assert(scalaCompWithNoPriority.get.output.validityConstraint.isEmpty)

// Activate 3 (threshold) alarms so the output is still SET
// but now SET alarms in input will be RELIABLE, UNSET UNRELIABLE
//
// We expect the output to be SET and RELIABLE (unreliable unset ignored)
val activated3 = activate(3)
val activatedSetMPs = activated3.map( iasValue => {
if (iasValue.asInstanceOf[IASValue[Alarm]].value.isSet()) setValidity(iasValue, IasValidity.RELIABLE)
else setValidity(iasValue, IasValidity.UNRELIABLE)
})
scalaCompWithNoPriority.get.update(activatedSetMPs)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, true))
assert(scalaCompWithNoPriority.get.output.validityConstraint.nonEmpty)
val constraintIds = scalaCompWithNoPriority.get.output.validityConstraint.get
assert(constraintIds.size==3)
assert(constraintIds.forall(checkValidity(activatedSetMPs,_,IasValidity.RELIABLE)))

// Same test with 4 (>threshold) SET alarms
val activated4 = activate(4)
val activatedSetMPs4 = activated4.map( iasValue => {
if (iasValue.asInstanceOf[IASValue[Alarm]].value.isSet()) setValidity(iasValue, IasValidity.RELIABLE)
else setValidity(iasValue, IasValidity.UNRELIABLE)
})
scalaCompWithNoPriority.get.update(activatedSetMPs4)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, true))
assert(scalaCompWithNoPriority.get.output.validityConstraint.nonEmpty)
val constraintIds4 = scalaCompWithNoPriority.get.output.validityConstraint.get
assert(constraintIds4.size==4)
assert(constraintIds4.forall(checkValidity(activatedSetMPs4,_,IasValidity.RELIABLE)))
}

it must "be empty when not enough SET inputs are reliable" in {
// Change all inputs to SET to trigger the TF
val changedMPs: Set[IASValue[?]] = inputsMPs.map ( iasio => iasio.updateValue(Some(Alarm.getInitialAlarmState.set())).updateProdTStamp(System.currentTimeMillis()).toIASValue())
// At the beginning all the inputs are UNRELIABLE
changedMPs.foreach( value => {assert(value.iasValidity==IasValidity.UNRELIABLE)})
scalaCompWithNoPriority.get.update(changedMPs)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, true))
assert(scalaCompWithNoPriority.get.output.validityConstraint.isEmpty)

// Activate 2 (threshold) alarms so the output is still SET
// but now SET alarms in input will be RELIABLE, UNSET UNRELIABLE
//
// We expect the output to be UNSET and RELIABLE (unreliable unset ignored)
val activated2 = activate(2)
val activatedSetMPs = activated2.map( iasValue => {
if (iasValue.asInstanceOf[IASValue[Alarm]].value.isSet()) setValidity(iasValue, IasValidity.RELIABLE)
else setValidity(iasValue, IasValidity.UNRELIABLE)
})
scalaCompWithNoPriority.get.update(activatedSetMPs)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, false))
assert(scalaCompWithNoPriority.get.output.validityConstraint.isEmpty)
}

it must "be non empty when enough UNSET inputs are reliable" in {
// Change all inputs to SET to trigger the TF
val changedMPs: Set[IASValue[?]] = inputsMPs.map ( iasio => iasio.updateValue(Some(Alarm.getInitialAlarmState.set())).updateProdTStamp(System.currentTimeMillis()).toIASValue())
// At the beginning all the inputs are UNRELIABLE
changedMPs.foreach( value => {assert(value.iasValidity==IasValidity.UNRELIABLE)})
scalaCompWithNoPriority.get.update(changedMPs)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, true))
assert(scalaCompWithNoPriority.get.output.validityConstraint.isEmpty)

// Activate 2 (threshold) alarms so the output is still SET
// but now UNSET alarms in input will be RELIABLE, UNSET UNRELIABLE
//
// We expect the output to be UNSET and RELIABLE (unreliable unset ignored)
val activated2 = activate(2)
val activatedUnSetMPs = activated2.map( iasValue => {
if (!iasValue.asInstanceOf[IASValue[Alarm]].value.isSet()) setValidity(iasValue, IasValidity.RELIABLE)
else setValidity(iasValue, IasValidity.UNRELIABLE)
})
scalaCompWithNoPriority.get.update(activatedUnSetMPs)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, false))
assert(scalaCompWithNoPriority.get.output.validityConstraint.nonEmpty)
val constraintIds2 = scalaCompWithNoPriority.get.output.validityConstraint.get
assert(constraintIds2.size==3)
assert(constraintIds2.forall(checkValidity(activatedUnSetMPs,_,IasValidity.RELIABLE)))

// Same as before but with 4 RELIABLE UNSET inputs
val activated1 = activate(1)
val activatedUnSetMPs4 = activated1.map( iasValue => {
if (!iasValue.asInstanceOf[IASValue[Alarm]].value.isSet()) setValidity(iasValue, IasValidity.RELIABLE)
else setValidity(iasValue, IasValidity.UNRELIABLE)
})
scalaCompWithNoPriority.get.update(activatedUnSetMPs4)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, false))
assert(scalaCompWithNoPriority.get.output.validityConstraint.nonEmpty)
val constraintIds4 = scalaCompWithNoPriority.get.output.validityConstraint.get
assert(constraintIds4.size==4)
assert(constraintIds4.forall(checkValidity(activatedUnSetMPs4,_,IasValidity.RELIABLE)))
}

it must "be empty when not enough UNSET inputs are reliable" in {
// Change all inputs to SET to trigger the TF
val changedMPs: Set[IASValue[?]] = inputsMPs.map ( iasio => iasio.updateValue(Some(Alarm.getInitialAlarmState.set())).updateProdTStamp(System.currentTimeMillis()).toIASValue())
// At the beginning all the inputs are UNRELIABLE
changedMPs.foreach( value => {assert(value.iasValidity==IasValidity.UNRELIABLE)})
scalaCompWithNoPriority.get.update(changedMPs)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, true))
assert(scalaCompWithNoPriority.get.output.validityConstraint.isEmpty)

// Activate 3 (threshold) alarms so the output is still SET
// but now UNSET alarms in input will be RELIABLE, SET UNRELIABLE
//
// We expect the output to be UNSET and RELIABLE (unreliable unset ignored)
val activated2 = activate(3)
val activatedSetMPs = activated2.map( iasValue => {
if (!iasValue.asInstanceOf[IASValue[Alarm]].value.isSet()) setValidity(iasValue, IasValidity.RELIABLE)
else setValidity(iasValue, IasValidity.UNRELIABLE)
})
scalaCompWithNoPriority.get.update(activatedSetMPs)
assert(checkAlarmActivation(scalaCompWithNoPriority.get, true))
assert(scalaCompWithNoPriority.get.output.validityConstraint.isEmpty)
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class Dasu7ASCEsTest extends AnyFlatSpec {
Set(v1,v2,v3,v4)
}
inputsProvider.sendInputs(setOfInputs)
// No the DASU has all the inputs and must produce the output
// Now the DASU has all the inputs and must produce the output
assert(iasValuesReceived.size==1)
val outputProducedByDasu = iasValuesReceived.last
assert(outputProducedByDasu.valueType==IASTypes.ALARM)
Expand All @@ -244,7 +244,7 @@ class Dasu7ASCEsTest extends AnyFlatSpec {
assert(iasValuesReceived.size==2)
val outputProducedByDasu2 = iasValuesReceived.last
assert(outputProducedByDasu2.valueType==IASTypes.ALARM)
assert(outputProducedByDasu2.value.asInstanceOf[Alarm]== Alarm.getInitialAlarmState.set())
assert(outputProducedByDasu2.value.asInstanceOf[Alarm].isSet())
assert(outputProducedByDasu2.productionTStamp.isPresent())

assert(outputProducedByDasu2.dependentsFullRuningIds.isPresent())
Expand Down Expand Up @@ -311,13 +311,12 @@ class Dasu7ASCEsTest extends AnyFlatSpec {

logger.info("Submits a set of RELIABLE inputs")
val inputs: Set[IASValue[_]] = Set(buildValue(inputTemperature1ID.id, inputTemperature1ID.fullRunningID,0))
val setOfInputs1: Set[IASValue[_]] = {
val v1=buildValue(inputTemperature1ID.id, inputTemperature1ID.fullRunningID,5)
val v2=buildValue(inputTemperature2ID.id, inputTemperature2ID.fullRunningID,6)
val v3=buildValue(inputTemperature3ID.id, inputTemperature3ID.fullRunningID,7)
val v4=buildValue(inputTemperature4ID.id, inputTemperature4ID.fullRunningID,8)
Set(v1,v2,v3,v4)
}
val setOfInputs1 = Set[IASValue[_]](
buildValue(inputTemperature1ID.id, inputTemperature1ID.fullRunningID,5),
buildValue(inputTemperature2ID.id, inputTemperature2ID.fullRunningID,6),
buildValue(inputTemperature3ID.id, inputTemperature3ID.fullRunningID,7),
buildValue(inputTemperature4ID.id, inputTemperature4ID.fullRunningID,8)
)
inputsProvider.sendInputs(setOfInputs1)

// wait to avoid the throttling to engage
Expand All @@ -332,6 +331,20 @@ class Dasu7ASCEsTest extends AnyFlatSpec {
inputsProvider.sendInputs(setOfInputs2)
// wait to avoid the throttling to engage
Thread.sleep(2*dasu.throttling)
// The DASU does notproduce any output because the Multiplicity TF takes
// into account the validity of only the inputs that are relevant
// @see https://github.com/IntegratedAlarmSystem-Group/ias/issues/201
assert(iasValuesReceived.size==0)

// Invalidate more inputs to trigger a change of validity of the output
val setOfInputs3 = Set[IASValue[_]](
buildValue(inputTemperature1ID.id, inputTemperature1ID.fullRunningID,5,UNRELIABLE),
buildValue(inputTemperature2ID.id, inputTemperature2ID.fullRunningID,5,UNRELIABLE),
buildValue(inputTemperature3ID.id, inputTemperature3ID.fullRunningID,5,UNRELIABLE)
)
inputsProvider.sendInputs(setOfInputs3)
// wait to avoid the throttling to engage
Thread.sleep(2*dasu.throttling)
assert(iasValuesReceived.size==1)
assert(iasValuesReceived.head.iasValidity == UNRELIABLE)
}
Expand Down
11 changes: 10 additions & 1 deletion KafkaUtils/src/main/python/IasKafkaUtils/KafkaValueConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ def onLost(self):
logger.info("Partition lost")
self.subscribed = False

def isSubscribed(self) -> bool:
"""
Returns:
True if the consumer is subscribed to a partition,
False otherwise
"""
return self.subscribed

def run(self):
logger.info('Thread to poll for IasValues started')
try:
Expand All @@ -133,6 +141,7 @@ def run(self):
with self.watchDogLock:
self.watchDog = True
if msg is None or not self.subscribed:
logger.debug(f"Polling thread is subscribed { self.subscribed}")
continue

if msg.error() is not None:
Expand Down Expand Up @@ -175,7 +184,7 @@ def getWatchdog(self):
Return and reset the watch dog.
Returns:
bool: True if the waychdog has been set, False otherwise
bool: True if the watchdog has been set, False otherwise
"""
with self.watchDogLock:
ret = self.watchDog
Expand Down
Loading

0 comments on commit b031d07

Please sign in to comment.