Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline latency #33

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions doc/examples/tutorial/example9.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
/*
* Latency associated with a pipeline of tasks.
* Task tick start the pipeline every 24ms.
* Task task1 gets a time reference (bref) and wakes task2 up (after
* executing for a while). task2 wakes task3 up.
* Eitherr task1 or task3 (extremes of the pipeline), depending on
* who arrives last, will use bref to compute end-to-end latency
* (pipe_latency) in the log.
*/
"tasks" : {
"tick" : {
"loop" : -1,
"cpus" : [0],
"phases" : {
"p1" : {
"loop" : 1,
"resume" : "task1",
"timer" : { "ref" : "tick", "period": 6000 }
},
"p2" : {
"loop" : 3,
"timer" : { "ref" : "tick", "period": 6000 }
}
}
},
"task1" : {
"loop" : -1,
"bref" : "pipeline",
"run1" : 300,
"resume" : "task2",
"run2" : 4000,
"barrier" : "pipeline",
"suspend" : "task1"
},
"task2" : {
"loop" : -1,
"suspend" : "task2",
"run" : 1000,
"resume" : "task3"
},
"task3" : {
"loop" : -1,
"suspend" : "task3",
"run" : 1000,
"barrier" : "pipeline"
}
},
"global" : {
"default_policy" : "SCHED_OTHER",
"duration" : 6,
"ftrace" : true,
"gnuplot" : false,
"lock_pages" : false,
"logdir" : "./",
"log_basename" : "example9",
"calibration" : "CPU0"
}
}
25 changes: 16 additions & 9 deletions doc/tutorial.txt
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,12 @@ generates the following sequence:
"unlock" : "SyncPointA" (internal mutex)
}

* bref : String. Time reference for a pipeline of tasks. It has to match a
barrier (see above). The task at the beginning of the pipeline calls this
event to get such a reference; the last task at the other end of the pipeline
(the one that broadcasts and wakes everybody else up) will compute the delta.
See doc/examples/tutorial/example9.json for more information.

* suspend : String. Block the calling thread until another thread wakes it up
with resume. The String can be let empty as it will be filled by workgen with
the right thread's name before starting the use case.
Expand Down Expand Up @@ -626,19 +632,20 @@ metrics are:
- c_duration: sum of the configured duration of run/runtime events
- c_period: sum of the timer(s) period(s)
- wu_lat: sum of wakeup latencies after timer events
- pipe_lat: end-to-end latency of a pipeline of tasks (0 if no pipeline exists)

Below is an extract of a log:

# Policy : SCHED_OTHER priority : 0
#idx perf run period start end rel_st slack c_duration c_period wu_lat
0 92164 19935 98965 504549567051 504549666016 2443 78701 20000 100000 266
0 92164 19408 99952 504549666063 504549766015 101455 80217 20000 100000 265
0 92164 19428 99952 504549766062 504549866014 201454 80199 20000 100000 264
0 92164 19438 99955 504549866060 504549966015 301452 80190 20000 100000 265
0 92164 19446 99952 504549966061 504550066013 401453 80093 20000 100000 264
0 92164 19415 99953 504550066060 504550166013 501452 80215 20000 100000 263
0 92164 19388 99954 504550166059 504550266013 601451 80242 20000 100000 264
0 92164 19444 99956 504550266060 504550366015 701452 80185 20000 100000 265
#idx perf run period start end rel_st slack c_duration c_period wu_lat pipe_lat
0 92164 19935 98965 504549567051 504549666016 2443 78701 20000 100000 266 0
0 92164 19408 99952 504549666063 504549766015 101455 80217 20000 100000 265 0
0 92164 19428 99952 504549766062 504549866014 201454 80199 20000 100000 264 0
0 92164 19438 99955 504549866060 504549966015 301452 80190 20000 100000 265 0
0 92164 19446 99952 504549966061 504550066013 401453 80093 20000 100000 264 0
0 92164 19415 99953 504550066060 504550166013 501452 80215 20000 100000 263 0
0 92164 19388 99954 504550166059 504550266013 601451 80242 20000 100000 264 0
0 92164 19444 99956 504550266060 504550366015 701452 80185 20000 100000 265 0

Some gnuplot files are also created to generate charts based on the log files
for each thread and for each kind of metrics. The format of the chart that
Expand Down
39 changes: 26 additions & 13 deletions src/rt-app.c
Original file line number Diff line number Diff line change
Expand Up @@ -286,18 +286,29 @@ static int run_event(event_data_t *event, int dry_run,
pthread_cond_signal(&(rdata->res.cond.obj));
break;
case rtapp_barrier:
log_debug("barrier %s ", rdata->name);
pthread_mutex_lock(&(rdata->res.barrier.m_obj));
if (rdata->res.barrier.waiting == 0) {
/* everyone is already waiting, signal */
pthread_cond_broadcast(&(rdata->res.barrier.c_obj));
} else {
/* not everyone is waiting, mark then wait */
rdata->res.barrier.waiting -= 1;
pthread_cond_wait(&(rdata->res.barrier.c_obj), &(rdata->res.barrier.m_obj));
rdata->res.barrier.waiting += 1;
{
struct timespec t_delta;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit nitpicky, so feel free to ignore it, but you could put t_delta inside the if condition to avoid the additional {} indent.

log_debug("barrier %s ", rdata->name);
pthread_mutex_lock(&(rdata->res.barrier.m_obj));
if (rdata->res.barrier.waiting == 0) {
/* everyone is already waiting, signal */
pthread_cond_broadcast(&(rdata->res.barrier.c_obj));
clock_gettime(CLOCK_MONOTONIC, &t_delta);
t_delta = timespec_sub(&t_delta, &rdata->res.barrier.t_ref);
ldata->pipe_latency = timespec_to_usec(&t_delta);
log_debug("pipeline duration %lu ", ldata->pipe_latency);
} else {
/* not everyone is waiting, mark then wait */
rdata->res.barrier.waiting -= 1;
pthread_cond_wait(&(rdata->res.barrier.c_obj), &(rdata->res.barrier.m_obj));
rdata->res.barrier.waiting += 1;
}
pthread_mutex_unlock(&(rdata->res.barrier.m_obj));
}
pthread_mutex_unlock(&(rdata->res.barrier.m_obj));
break;
case rtapp_bref:
log_debug("bref %s ", rdata->name);
clock_gettime(CLOCK_MONOTONIC, &rdata->res.barrier.t_ref);
break;
case rtapp_sig_and_wait:
log_debug("signal and wait %s", rdata->name);
Expand Down Expand Up @@ -762,10 +773,11 @@ void *thread_body(void *arg)

log_notice("[%d] starting thread ...\n", data->ind);

fprintf(data->log_handler, "%s %8s %8s %8s %15s %15s %15s %10s %10s %10s %10s\n",
fprintf(data->log_handler, "%s %8s %8s %8s %15s %15s %15s %10s %10s %10s %10s %10s\n",
"#idx", "perf", "run", "period",
"start", "end", "rel_st", "slack",
"c_duration", "c_period", "wu_lat");
"c_duration", "c_period", "wu_lat",
"pipe_lat");

if (opts.ftrace)
log_ftrace(ft_data.marker_fd, "[%d] starts", data->ind);
Expand Down Expand Up @@ -836,6 +848,7 @@ void *thread_body(void *arg)
curr_timing->duration = ldata.duration;
curr_timing->perf = ldata.perf;
curr_timing->wu_latency = ldata.wu_latency;
curr_timing->pipe_latency = ldata.pipe_latency;
curr_timing->slack = ldata.slack;
curr_timing->c_period = ldata.c_period;
curr_timing->c_duration = ldata.c_duration;
Expand Down
19 changes: 19 additions & 0 deletions src/rt-app_parse_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ static void init_barrier_resource(rtapp_resource_t *data, const rtapp_options_t
&data->res.barrier.m_attr);

pthread_cond_init(&data->res.barrier.c_obj, NULL);
data->res.barrier.t_ref = usec_to_timespec(0UL);
}

static void
Expand Down Expand Up @@ -517,6 +518,23 @@ parse_thread_event_data(char *name, struct json_object *obj,
return;
}

if (!strncmp(name, "bref", strlen("bref"))) {

if (!json_object_is_type(obj, json_type_string))
goto unknown_event;

data->type = rtapp_bref;

ref = json_object_get_string(obj);
i = get_resource_index(ref, rtapp_barrier, opts);

data->res = i;
rdata = &(opts->resources[data->res]);

log_info(PIN2 "type %d target %s [%d]", data->type, rdata->name, rdata->index);
return;
}

if (!strncmp(name, "timer", strlen("timer"))) {

tmp = get_string_value_from(obj, "ref", TRUE, "unknown");
Expand Down Expand Up @@ -631,6 +649,7 @@ static char *events[] = {
"iorun",
"yield",
"barrier",
"bref",
NULL
};

Expand Down
7 changes: 6 additions & 1 deletion src/rt-app_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ typedef enum resource_t
rtapp_iorun,
rtapp_runtime,
rtapp_yield,
rtapp_barrier
rtapp_barrier,
rtapp_bref
} resource_t;

struct _rtapp_mutex {
Expand All @@ -97,6 +98,8 @@ struct _rtapp_barrier_like {
int waiting;
/* condvar to wait/signal on */
pthread_cond_t c_obj;
/* time reference for tasks pipeline */
struct timespec t_ref;
};

struct _rtapp_signal {
Expand Down Expand Up @@ -200,6 +203,7 @@ typedef struct _log_data_t {
unsigned long perf;
unsigned long duration;
unsigned long wu_latency;
unsigned long pipe_latency;
unsigned long c_duration;
unsigned long c_period;
long slack;
Expand Down Expand Up @@ -241,6 +245,7 @@ typedef struct _timing_point_t {
unsigned long c_duration;
unsigned long c_period;
unsigned long wu_latency;
unsigned long pipe_latency;
long slack;
__u64 start_time;
__u64 end_time;
Expand Down
5 changes: 3 additions & 2 deletions src/rt-app_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void
log_timing(FILE *handler, timing_point_t *t)
{
fprintf(handler,
"%4d %8lu %8lu %8lu %15llu %15llu %15llu %10ld %10lu %10lu %10lu",
"%4d %8lu %8lu %8lu %15llu %15llu %15llu %10ld %10lu %10lu %10lu %10lu",
t->ind,
t->perf,
t->duration,
Expand All @@ -156,7 +156,8 @@ log_timing(FILE *handler, timing_point_t *t)
t->slack,
t->c_duration,
t->c_period,
t->wu_latency
t->wu_latency,
t->pipe_latency
);
fprintf(handler, "\n");
}
Expand Down