Skip to content

Commit

Permalink
Attributes for action and connection buffers (#198)
Browse files Browse the repository at this point in the history
* This PR adds attributes for specifying the max num pending events for actions and the max buffer size for connections

* Simple formatting
  • Loading branch information
erlingrj authored Jan 24, 2025
1 parent c84cb92 commit dc632df
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 10 deletions.
19 changes: 19 additions & 0 deletions lfc/core/src/main/java/org/lflang/AttributeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
package org.lflang;

import static org.lflang.ast.ASTUtils.factory;
import static org.lflang.ast.ASTUtils.toInteger;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -290,6 +291,24 @@ public static List<Attribute> getInterfaceAttributes(Instantiation node) {
return findAttributesByNameStartingWith(node, "interface");
}

public static int getMaxNumberOfPendingEvents(Action node) {
Attribute attr = findAttributeByName(node, "max_pending_events");
if (attr != null) {
return Integer.valueOf(attr.getAttrParms().get(0).getValue());
} else {
return -1;
}
}

public static int getConnectionBufferSize(Connection node) {
Attribute attr = findAttributeByName(node, "buffer");
if (attr != null) {
return Integer.valueOf(attr.getAttrParms().get(0).getValue());
} else {
return -1;
}
}

public static Attribute getLinkAttribute(Connection node) {
return findAttributeByName(node, "link");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@ enum AttrParamType {
ATTRIBUTE_SPECS_BY_NAME.put(
"_networkReactor",
new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.STRING, false))));
// @max_pending_event(<value>)
ATTRIBUTE_SPECS_BY_NAME.put(
"max_pending_events",
new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.INT, false))));
// @buffer(<value>)
ATTRIBUTE_SPECS_BY_NAME.put(
"buffer",
new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.INT, false))));
// @interface:tcp(name="string", address="string") e.g. @interface:tcp(name="if1", address="127.0.0.1")
ATTRIBUTE_SPECS_BY_NAME.put(
"interface_tcp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import org.lflang.generator.uc.UcReactorGenerator.Companion.codeType
import org.lflang.generator.uc.UcReactorGenerator.Companion.getEffects
import org.lflang.generator.uc.UcReactorGenerator.Companion.getObservers
import org.lflang.generator.uc.UcReactorGenerator.Companion.getSources
import org.lflang.AttributeUtils.getMaxNumberOfPendingEvents
import org.lflang.lf.*

class UcActionGenerator(private val reactor: Reactor) {

companion object {
public val Action.maxNumPendingEvents
get(): Int = 12 // FIXME: This should be annotated in the LF code
get(): Int {
val num = getMaxNumberOfPendingEvents(this)
return if (num > 0) num else 1
}
}

/** Returns the C Enum representing the type of action.*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.lflang.generator.uc

import org.lflang.AttributeUtils.getLinkAttribute
import org.lflang.AttributeUtils.*
import org.lflang.generator.orNever
import org.lflang.generator.uc.UcInstanceGenerator.Companion.codeWidth
import org.lflang.generator.uc.UcInstanceGenerator.Companion.width
Expand Down Expand Up @@ -58,7 +58,7 @@ open class UcGroupedConnection(
channels.groupingBy { Pair(it.src.getCodePortIdx(), it.src.getCodeBankIdx()) }.eachCount()
frequencyMap.values.maxOrNull() ?: 0
}
val maxNumPendingEvents = 16 // FIXME: Must be derived from the program
val maxNumPendingEvents = if (getConnectionBufferSize(lfConn) > 0) getConnectionBufferSize(lfConn) else 1

fun assignUid(id: Int) {
uid = id
Expand Down
2 changes: 1 addition & 1 deletion src/action.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ lf_ret_t Action_schedule(Action *self, interval_t offset, const void *value) {
}

if (self->events_scheduled >= self->max_pending_events) {
LF_ERR(TRIG, "Too many pending events. Capacity is %i", self->max_pending_events);
LF_ERR(TRIG, "Action event buffer is full, dropping event. Capacity is %i", self->max_pending_events);
return LF_ERR;
}

Expand Down
6 changes: 5 additions & 1 deletion src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ void DelayedConnection_trigger_downstreams(Connection *_self, const void *value,
EventPayloadPool *pool = trigger->payload_pool;
if (self->staged_payload_ptr == NULL) {
ret = pool->allocate(pool, &self->staged_payload_ptr);
validate(ret == LF_OK); // FIME: Trigger_downstreams should return lf_ret_t
if (ret != LF_OK) {
LF_ERR(CONN, "No more space in event buffer for delayed connection %p, dropping. Capacity is %d", _self,
self->payload_pool.capacity);
return;
}
}
memcpy(self->staged_payload_ptr, value, value_size);
sched->register_for_cleanup(sched, &_self->super);
Expand Down
1 change: 1 addition & 0 deletions test/lf/src/Action.lf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ target uC {
}

main reactor {
@max_pending_events(10)
logical action a:int
state cnt:int=0
state cnt2:int=0
Expand Down
26 changes: 26 additions & 0 deletions test/lf/src/ActionBuffer.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
target uC {
platform: Native
}

main reactor {

@max_pending_events(10)
logical action a:int

state received:int=0

reaction(startup) -> a {=
for (int i = 0; i<10; i++) {
lf_schedule(a, MSEC(i), i);
}
=}

reaction(a) {=
validate(a->value == self->received++);
printf("Action scheduled wtih %d\n", a->value);
=}

reaction(shutdown) {=
validate(self->received == 10);
=}
}
17 changes: 12 additions & 5 deletions test/lf/src/Connections.lf
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,32 @@ reactor R1 {
}


reactor R2 {
reactor R2(expected: int = 0) {
input in:int

state cnt:int = 0

reaction(startup) {=
printf("Hi from other guy at startup\n");

=}

reaction(in) {=
printf("Got %d\n", in->value);
self->cnt++;
=}

reaction(shutdown) {=
validate(self->cnt == self->expected);
=}
}

main reactor {
r1 = new R1()
r2 = new R2()
r2 = new R2(expected = 11)
r1.out -> r2.in

r3 = new R1()
r4 = new R2()
r3.out -> r4.in after 100 msec
r4 = new R2(expected = 10)

r3.out -> r4.in after 10 msec
}
38 changes: 38 additions & 0 deletions test/lf/src/DelayedConnectionBuffer.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
target uC {
platform: Native,
timeout: 200 msec
}

reactor R1 {
output out: int
timer t(0, 10 msec)
state cnt: int = 0;

reaction(t) -> out {=
if (self->cnt < 10) {
printf("Hello from R1 at %ld\n", env->get_elapsed_physical_time(env));
lf_set(out, self->cnt++);
}
=}
}

reactor R2 {
input in:int
state cnt:int = 0
reaction(in) {=
printf("Got %d\n", in->value);
validate(in->value == self->cnt++);
=}

reaction(shutdown) {=
validate(self->cnt == 10);
=}
}

main reactor {
r1 = new R1()
r2 = new R2()

@buffer(10)
r1.out -> r2.in after 100 msec
}

0 comments on commit dc632df

Please sign in to comment.