diff --git a/lfc/core/src/main/java/org/lflang/AttributeUtils.java b/lfc/core/src/main/java/org/lflang/AttributeUtils.java index 741058ec..759ccc04 100644 --- a/lfc/core/src/main/java/org/lflang/AttributeUtils.java +++ b/lfc/core/src/main/java/org/lflang/AttributeUtils.java @@ -26,6 +26,7 @@ package org.lflang; import static org.lflang.ast.ASTUtils.factory; +import static org.lflang.ast.ASTUtils.toInteger; import java.util.HashMap; import java.util.List; @@ -290,6 +291,24 @@ public static List getInterfaceAttributes(Instantiation node) { return findAttributesByNameStartingWith(node, "interface"); } + public static int getMaxNumberOfPendingEvents(Action node) { + Attribute attr = findAttributeByName(node, "max_pending_events"); + if (attr != null) { + return Integer.valueOf(attr.getAttrParms().get(0).getValue()); + } else { + return -1; + } + } + + public static int getConnectionBufferSize(Connection node) { + Attribute attr = findAttributeByName(node, "buffer"); + if (attr != null) { + return Integer.valueOf(attr.getAttrParms().get(0).getValue()); + } else { + return -1; + } + } + public static Attribute getLinkAttribute(Connection node) { return findAttributeByName(node, "link"); } diff --git a/lfc/core/src/main/java/org/lflang/validation/AttributeSpec.java b/lfc/core/src/main/java/org/lflang/validation/AttributeSpec.java index 2bdcaf2e..dd77ba9f 100644 --- a/lfc/core/src/main/java/org/lflang/validation/AttributeSpec.java +++ b/lfc/core/src/main/java/org/lflang/validation/AttributeSpec.java @@ -243,6 +243,14 @@ enum AttrParamType { ATTRIBUTE_SPECS_BY_NAME.put( "_networkReactor", new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.STRING, false)))); + // @max_pending_event() + ATTRIBUTE_SPECS_BY_NAME.put( + "max_pending_events", + new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.INT, false)))); + // @buffer() + ATTRIBUTE_SPECS_BY_NAME.put( + "buffer", + new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.INT, false)))); // @interface:tcp(name="string", address="string") e.g. @interface:tcp(name="if1", address="127.0.0.1") ATTRIBUTE_SPECS_BY_NAME.put( "interface_tcp", diff --git a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcActionGenerator.kt b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcActionGenerator.kt index a473c67d..1688d7f5 100644 --- a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcActionGenerator.kt +++ b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcActionGenerator.kt @@ -9,13 +9,17 @@ import org.lflang.generator.uc.UcReactorGenerator.Companion.codeType import org.lflang.generator.uc.UcReactorGenerator.Companion.getEffects import org.lflang.generator.uc.UcReactorGenerator.Companion.getObservers import org.lflang.generator.uc.UcReactorGenerator.Companion.getSources +import org.lflang.AttributeUtils.getMaxNumberOfPendingEvents import org.lflang.lf.* class UcActionGenerator(private val reactor: Reactor) { companion object { public val Action.maxNumPendingEvents - get(): Int = 12 // FIXME: This should be annotated in the LF code + get(): Int { + val num = getMaxNumberOfPendingEvents(this) + return if (num > 0) num else 1 + } } /** Returns the C Enum representing the type of action.*/ diff --git a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcConnectionUtils.kt b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcConnectionUtils.kt index 9688358b..f72878f5 100644 --- a/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcConnectionUtils.kt +++ b/lfc/core/src/main/kotlin/org/lflang/generator/uc/UcConnectionUtils.kt @@ -1,6 +1,6 @@ package org.lflang.generator.uc -import org.lflang.AttributeUtils.getLinkAttribute +import org.lflang.AttributeUtils.* import org.lflang.generator.orNever import org.lflang.generator.uc.UcInstanceGenerator.Companion.codeWidth import org.lflang.generator.uc.UcInstanceGenerator.Companion.width @@ -58,7 +58,7 @@ open class UcGroupedConnection( channels.groupingBy { Pair(it.src.getCodePortIdx(), it.src.getCodeBankIdx()) }.eachCount() frequencyMap.values.maxOrNull() ?: 0 } - val maxNumPendingEvents = 16 // FIXME: Must be derived from the program + val maxNumPendingEvents = if (getConnectionBufferSize(lfConn) > 0) getConnectionBufferSize(lfConn) else 1 fun assignUid(id: Int) { uid = id diff --git a/src/action.c b/src/action.c index 491dfa70..76a7cc7b 100644 --- a/src/action.c +++ b/src/action.c @@ -52,7 +52,7 @@ lf_ret_t Action_schedule(Action *self, interval_t offset, const void *value) { } if (self->events_scheduled >= self->max_pending_events) { - LF_ERR(TRIG, "Too many pending events. Capacity is %i", self->max_pending_events); + LF_ERR(TRIG, "Action event buffer is full, dropping event. Capacity is %i", self->max_pending_events); return LF_ERR; } diff --git a/src/connection.c b/src/connection.c index 4a97255a..aa15fff8 100644 --- a/src/connection.c +++ b/src/connection.c @@ -142,7 +142,11 @@ void DelayedConnection_trigger_downstreams(Connection *_self, const void *value, EventPayloadPool *pool = trigger->payload_pool; if (self->staged_payload_ptr == NULL) { ret = pool->allocate(pool, &self->staged_payload_ptr); - validate(ret == LF_OK); // FIME: Trigger_downstreams should return lf_ret_t + if (ret != LF_OK) { + LF_ERR(CONN, "No more space in event buffer for delayed connection %p, dropping. Capacity is %d", _self, + self->payload_pool.capacity); + return; + } } memcpy(self->staged_payload_ptr, value, value_size); sched->register_for_cleanup(sched, &_self->super); diff --git a/test/lf/src/Action.lf b/test/lf/src/Action.lf index ab0f254f..2064894c 100644 --- a/test/lf/src/Action.lf +++ b/test/lf/src/Action.lf @@ -3,6 +3,7 @@ target uC { } main reactor { + @max_pending_events(10) logical action a:int state cnt:int=0 state cnt2:int=0 diff --git a/test/lf/src/ActionBuffer.lf b/test/lf/src/ActionBuffer.lf new file mode 100644 index 00000000..579afdda --- /dev/null +++ b/test/lf/src/ActionBuffer.lf @@ -0,0 +1,26 @@ +target uC { + platform: Native +} + +main reactor { + + @max_pending_events(10) + logical action a:int + + state received:int=0 + + reaction(startup) -> a {= + for (int i = 0; i<10; i++) { + lf_schedule(a, MSEC(i), i); + } + =} + + reaction(a) {= + validate(a->value == self->received++); + printf("Action scheduled wtih %d\n", a->value); + =} + + reaction(shutdown) {= + validate(self->received == 10); + =} +} diff --git a/test/lf/src/Connections.lf b/test/lf/src/Connections.lf index dfa29a64..68b2787b 100644 --- a/test/lf/src/Connections.lf +++ b/test/lf/src/Connections.lf @@ -19,25 +19,32 @@ reactor R1 { } -reactor R2 { +reactor R2(expected: int = 0) { input in:int + state cnt:int = 0 + reaction(startup) {= printf("Hi from other guy at startup\n"); - =} reaction(in) {= printf("Got %d\n", in->value); + self->cnt++; + =} + + reaction(shutdown) {= + validate(self->cnt == self->expected); =} } main reactor { r1 = new R1() - r2 = new R2() + r2 = new R2(expected = 11) r1.out -> r2.in r3 = new R1() - r4 = new R2() - r3.out -> r4.in after 100 msec + r4 = new R2(expected = 10) + + r3.out -> r4.in after 10 msec } diff --git a/test/lf/src/DelayedConnectionBuffer.lf b/test/lf/src/DelayedConnectionBuffer.lf new file mode 100644 index 00000000..5ad7dc43 --- /dev/null +++ b/test/lf/src/DelayedConnectionBuffer.lf @@ -0,0 +1,38 @@ +target uC { + platform: Native, + timeout: 200 msec +} + +reactor R1 { + output out: int + timer t(0, 10 msec) + state cnt: int = 0; + + reaction(t) -> out {= + if (self->cnt < 10) { + printf("Hello from R1 at %ld\n", env->get_elapsed_physical_time(env)); + lf_set(out, self->cnt++); + } + =} +} + +reactor R2 { + input in:int + state cnt:int = 0 + reaction(in) {= + printf("Got %d\n", in->value); + validate(in->value == self->cnt++); + =} + + reaction(shutdown) {= + validate(self->cnt == 10); + =} +} + +main reactor { + r1 = new R1() + r2 = new R2() + + @buffer(10) + r1.out -> r2.in after 100 msec +}