Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update calculate_results to calculate statistics excluding trimmed samples #12

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions common/normalizedtestrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (tr *TestRun) NormalizedConfigWithAgentData(
cpu,
SystemRoleAtomizerCliWatchtower,
SystemRoleTwoPhaseGen,
SystemRolePhaseTwoGen,
SystemRoleParsecGen,
),
)
trc.SentinelCPU = int(
Expand Down Expand Up @@ -198,7 +198,7 @@ func (tr *TestRun) NormalizedConfigWithAgentData(
ram,
SystemRoleAtomizerCliWatchtower,
SystemRoleTwoPhaseGen,
SystemRolePhaseTwoGen,
SystemRoleParsecGen,
) / 1024,
)
trc.SentinelRAM = int(
Expand Down Expand Up @@ -236,7 +236,7 @@ func (tr *TestRun) NormalizedConfigWithAgentData(
count,
SystemRoleAtomizerCliWatchtower,
SystemRoleTwoPhaseGen,
SystemRolePhaseTwoGen,
SystemRoleParsecGen,
)
trc.SentinelCount = getIntValueForAnyKey(
count,
Expand Down
10 changes: 5 additions & 5 deletions common/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const SystemRoleTwoPhaseGen SystemRole = "twophase-gen"
const SystemRoleAgent SystemRole = "agent"
const SystemRoleRuntimeLockingShard SystemRole = "runtime_locking_shard"
const SystemRoleTicketMachine SystemRole = "ticket_machine"
const SystemRolePhaseTwoGen SystemRole = "phasetwo_bench"
const SystemRoleParsecGen SystemRole = "parsec_bench"
const SystemRoleLoadGen SystemRole = "loadgen"

type SystemArchitectureRole struct {
Expand Down Expand Up @@ -223,8 +223,8 @@ var AvailableArchitectures = []SystemArchitecture{
},
},
{
ID: "phase-two",
Name: "Phase Two Programmability",
ID: "parsec",
Name: "PArSEC",
Roles: []SystemArchitectureRole{
{
Role: SystemRoleAgent,
Expand All @@ -242,7 +242,7 @@ var AvailableArchitectures = []SystemArchitecture{
ShortTitle: "Ticketer",
},
{
Role: SystemRolePhaseTwoGen,
Role: SystemRoleParsecGen,
Title: "Generator",
ShortTitle: "Gen",
},
Expand All @@ -261,7 +261,7 @@ var AvailableArchitectures = []SystemArchitecture{
RaftMaxBatch: 100000,
SnapshotDistance: 0,
ShardReplicationFactor: 3,
Architecture: "phase-two",
Architecture: "parsec",
SampleCount: 315,
LoadGenOutputCount: 2,
LoadGenInputCount: 2,
Expand Down
45 changes: 25 additions & 20 deletions coordinator/scripts/calculate_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ def process_lats(lats):
df['latsS'] = df.lats / 10**3
df['pDate'] = df.time.values.astype('datetime64[ns]')
df = df[df.time > 1609459200000] # Filter out (corrupt) times before 2021
# trim first <trim_samples> samples from dataframe
if 'TRIM_SAMPLES' in environ:
trim_samples = int(environ['TRIM_SAMPLES'])
df = df[df.time > df[0][0] + trim_samples*(block_time_ms / 1000)*one_sec]
dat = df.groupby(by=MyBinnerTime(expression=df.pDate, resolution='s', df=df, label='pDate'), agg={'count': 'count', 'lats': vaex.agg.list('lats')})
dat['lats'] = dat['lats'].apply(process_lats)

Expand Down Expand Up @@ -236,7 +240,7 @@ def process_lats(lats):
lat_99999.append(tps_its[2][1][idx][2])
idx += 1
current += datetime.timedelta(seconds=1)

dat = df.groupby(df.latsS, agg='count')
lats = dat.values
lat_max = df.max(df.latsS)
Expand All @@ -262,11 +266,11 @@ def process_lats(lats):
lat_lines.append({"lats":lat_99999, "title":"99.999%", "freq": 1})

periods = []

idx = 0
tps_target_files = [join('outputs',x) for x in listdir('outputs') \
if 'tps_target_' in x and 'hdf5' not in x]
if len(tps_target_files) > 0:
if len(tps_target_files) > 0:
t_index = pandas.date_range(start=begin- datetime.timedelta(seconds=5), end=end, freq='1s')
exports = 0
for f in tps_target_files:
Expand All @@ -285,7 +289,7 @@ def process_lats(lats):
exports = exports + 1
else:
print('{} has no rows', f)

if exports > 0:
df2 = vaex.open('outputs/*-tps_target_*.txt.hdf5')
df2['pDate'] = df2['index']
Expand All @@ -296,11 +300,11 @@ def process_lats(lats):
dat3.drop('tps_target', inplace=True)
dat3.drop('pDate', inplace=True)
dat2.join(dat3, inplace=True)

its = dat2.to_items()
tps_target = make_tps_target_series_line(its, begin, end, (lambda its,idx: its[0][1][idx].astype(datetime.datetime)), (lambda its,idx: its[1][1][idx]))
periods = extract_tps_target_periods(its, begin, end, (lambda its,idx: its[0][1][idx].astype(datetime.datetime)), (lambda its,idx: its[1][1][idx]), (lambda its,idx: its[3][1][idx]))

tps_lines.append({"tps":tps_target, "title":"Loadgen target", "freq": 1, "ma": False})

prev_lat99 = 0
Expand All @@ -326,6 +330,13 @@ def process_lats(lats):
elbow_latmean.append(prev_latmean)
elbow_lat99.append(prev_lat99)
elbow_lat99999.append(prev_lat99999)
elif 'TRIM_SAMPLES' in environ :
## Lob off (configurable) more "warm up" samples
trim_samples = int(environ['TRIM_SAMPLES'])
for i in range(len(tps_lines)):
tps_lines[i]["tps"] = tps_lines[i]["tps"][trim_samples:]
for i in range(len(lat_lines)):
lat_lines[i]["lats"] = lat_lines[i]["lats"][trim_samples:]
Comment on lines +336 to +339
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maurermi my python is a bit rusty. I was expecting this code to trim the first trim_samples samples from the set, but that doesn't appear to be what's happening when I go to test (the beginning of the test is preserved, and clearly the last trim_samples samples are removed).

It's pretty clear the previous version of the code did exactly that, but I'm not sure why. Perhaps this should be merged (since it correctly applies the trimming to the stats and charts), and we should consider offering a manual trim-from-begin and trim-from-end inputs? (would mirror the auto-trim-zeros split well)


if archiver_based:
for output_file in output_files:
Expand Down Expand Up @@ -356,13 +367,7 @@ def process_lats(lats):
while len(lat_lines[i]["lats"]) > 0 and int(lat_lines[i]["lats"][-1]) == 0:
lat_lines[i]["lats"].pop()

## Lob off (configurable) more "warm up" samples
if 'TRIM_SAMPLES' in environ:
trim_samples = int(environ['TRIM_SAMPLES'])
for i in range(len(tps_lines)):
tps_lines[i]["tps"] = tps_lines[i]["tps"][trim_samples:]
for i in range(len(lat_lines)):
lat_lines[i]["lats"] = lat_lines[i]["lats"][trim_samples:]



## Create throughput histogram
Expand Down Expand Up @@ -480,7 +485,7 @@ def dev_to_val(dev):
if len(colors) > j:
color = colors[j]
ax.plot(tps_time, tps_ma, label='{} ({}ms MA)'.format(tps_line["title"],tps_ma_ms), color=color)



max = max * 1.02
Expand Down Expand Up @@ -572,7 +577,7 @@ def dev_to_val(dev):
if len(markers) > i:
marker = markers[i]
ax.plot(elbow_tps, yy, label=titles[i], color=color, marker=marker)

max = max * 1.02

ax.set_ylabel('Latency (ms)')
Expand All @@ -584,13 +589,13 @@ def dev_to_val(dev):
# TODO: Find proper way of finding peak TPS range. None of this is working
# accurately
# for yy in y:
# delta_ma_tmp = []
# delta_ma_tmp = []
# pf_x = x
# pf_y = yy
# while math.isnan(pf_y[-1]):
# pf_y = pf_y[:-1]
# pf_x = pf_x[:-1]

# while math.isnan(pf_y[1]):
# pf_y = pf_y[1:]
# pf_x = pf_x[1:]
Expand Down Expand Up @@ -625,16 +630,16 @@ def dev_to_val(dev):
# peak_lb_idx = 0
# if peak_ub_idx < 0:
# peak_ub_idx = 0

# peak_lb = pf_x[peak_lb_idx]
# peak_ub = pf_x[peak_ub_idx]
# peak_found = True
# if delta_ma_above < 8:
# peak_found = False

# if peak_found:
# break

# if peak_ub > 0:
# ax.set_title('Latency/Throughput Elbow\nDetected peak {}-{} TX/s'.format(peak_lb, peak_ub))

Expand Down
6 changes: 3 additions & 3 deletions coordinator/sources/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,22 +249,22 @@ func (s *SourcesManager) Compile(
proxy_path := filepath.Join(
sourcesDir(),
"src",
"3pc",
"parsec",
"agent",
"runners",
"evm",
"rpc_proxy",
)
if _, err := os.Stat(proxy_path); !os.IsNotExist(err) {
logging.Infof(
"[Compile %s-%t]: Copying 3PC/EVM RPC proxy",
"[Compile %s-%t]: Copying parsec/EVM RPC proxy",
hash,
profilingOrDebugging,
)
dest_proxy_path := filepath.Join(
binariesPath,
"src",
"3pc",
"parsec",
"agent",
"runners",
"evm",
Expand Down
26 changes: 13 additions & 13 deletions coordinator/testruns/architecture_phase_two.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ import (
"github.com/mit-dci/opencbdc-tctl/common"
)

func (t *TestRunManager) IsPhaseTwo(architectureID string) bool {
return strings.HasPrefix(architectureID, "phase-two")
func (t *TestRunManager) IsParsec(architectureID string) bool {
return strings.HasPrefix(architectureID, "parsec")
}

// RunBinariesPhaseTwo will orchestrate the running of all roles for a full
// cycle test with the phase two architecture
func (t *TestRunManager) RunBinariesPhaseTwo(
// RunBinariesParsec will orchestrate the running of all roles for a full
// cycle test with the PArSEC architecture
func (t *TestRunManager) RunBinariesParsec(
tr *common.TestRun,
envs map[int32][]byte,
cmd chan *common.ExecutedCommand,
failures chan *common.ExecutedCommand,
) error {
// Build the sequence of commands to start
startSequence := t.CreateStartSequencePhaseTwo(tr)
startSequence := t.CreateStartSequenceParsec(tr)

// Execute the sequence of commands to start
allCmds, terminated, err := t.executeStartSequence(
Expand Down Expand Up @@ -76,7 +76,7 @@ func (t *TestRunManager) RunBinariesPhaseTwo(
case <-time.After(timeout):
}

err = t.CleanupCommandsPhaseTwo(tr, allCmds, envs)
err = t.CleanupCommandsParsec(tr, allCmds, envs)
if err != nil {
return err
}
Expand All @@ -87,7 +87,7 @@ func (t *TestRunManager) RunBinariesPhaseTwo(
return nil
}

func (t *TestRunManager) CleanupCommandsPhaseTwo(
func (t *TestRunManager) CleanupCommandsParsec(
tr *common.TestRun,
allCmds []runningCommand,
envs map[int32][]byte,
Expand All @@ -109,7 +109,7 @@ func (t *TestRunManager) CleanupCommandsPhaseTwo(
t.WriteLog(tr, "Interrupting all loadgens")
err := t.BreakAllCmds(
tr,
t.FilterCommandsByRole(tr, allCmds, common.SystemRolePhaseTwoGen),
t.FilterCommandsByRole(tr, allCmds, common.SystemRoleParsecGen),
)
if err != nil {
return err
Expand All @@ -132,7 +132,7 @@ func (t *TestRunManager) CleanupCommandsPhaseTwo(
t.WriteLog(tr, "Terminating all loadgens")
err = t.TerminateAllCmds(
tr,
t.FilterCommandsByRole(tr, allCmds, common.SystemRolePhaseTwoGen),
t.FilterCommandsByRole(tr, allCmds, common.SystemRoleParsecGen),
)
if err != nil {
return err
Expand Down Expand Up @@ -183,11 +183,11 @@ func (t *TestRunManager) CleanupCommandsPhaseTwo(
return nil
}

// CreateStartSequencePhaseTwo uses the test run configuration to determine in
// CreateStartSequenceParsec uses the test run configuration to determine in
// which sequence the agent roles should be started, and returns an array of
// startSequenceEntry elements that are ordered in the sequence in which they
// should be started up.
func (t *TestRunManager) CreateStartSequencePhaseTwo(
func (t *TestRunManager) CreateStartSequenceParsec(
tr *common.TestRun,
) []startSequenceEntry {
// Determine the start sequence
Expand Down Expand Up @@ -236,7 +236,7 @@ func (t *TestRunManager) CreateStartSequencePhaseTwo(

// Start all load generators
startSequence = append(startSequence, startSequenceEntry{
roles: t.GetAllRolesSorted(tr, common.SystemRolePhaseTwoGen),
roles: t.GetAllRolesSorted(tr, common.SystemRoleParsecGen),
timeout: roleStartTimeout,
waitForPort: []PortIncrement{}, // Don't wait for anything - loadgens don't accept incoming
})
Expand Down
14 changes: 7 additions & 7 deletions coordinator/testruns/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ var roleBinaries = map[common.SystemRole]string{
common.SystemRoleSentinelTwoPhase: "sources/build/src/uhs/twophase/sentinel_2pc/sentineld-2pc",
common.SystemRoleAtomizerCliWatchtower: "sources/build/tools/bench/atomizer-cli-watchtower",
common.SystemRoleTwoPhaseGen: "sources/build/tools/bench/twophase-gen",
common.SystemRoleAgent: "sources/build/src/3pc/agent/agentd",
common.SystemRoleRuntimeLockingShard: "sources/build/src/3pc/runtime_locking_shard/runtime_locking_shardd",
common.SystemRoleTicketMachine: "sources/build/src/3pc/ticket_machine/ticket_machined",
common.SystemRolePhaseTwoGen: "sources/build/tools/bench/3pc/evm/evm_bench",
common.SystemRoleAgent: "sources/build/src/parsec/agent/agentd",
common.SystemRoleRuntimeLockingShard: "sources/build/src/parsec/runtime_locking_shard/runtime_locking_shardd",
common.SystemRoleTicketMachine: "sources/build/src/parsec/ticket_machine/ticket_machined",
common.SystemRoleParsecGen: "sources/build/tools/bench/parsec/evm/evm_bench",
}

// roleParameters is a map from the system role to the parameters we have to
Expand Down Expand Up @@ -78,7 +78,7 @@ var roleParameters = map[common.SystemRole][]string{
"--loglevel=%LOGLEVEL%",
"--component_id=%IDX%",
},
common.SystemRolePhaseTwoGen: []string{
common.SystemRoleParsecGen: []string{
"--component_id=%IDX%",
"--loadgen_accounts=%ACCOUNTS%",
"--loadgen_agent_affinity=%LGAFFINITY%",
Expand Down Expand Up @@ -373,8 +373,8 @@ func (t *TestRunManager) RunBinaries(
return t.RunBinariesAtomizer(tr, envs, cmd, failures)
} else if t.Is2PC(tr.Architecture) {
return t.RunBinariesTwoPhase(tr, envs, cmd, failures)
} else if t.IsPhaseTwo(tr.Architecture) {
return t.RunBinariesPhaseTwo(tr, envs, cmd, failures)
} else if t.IsParsec(tr.Architecture) {
return t.RunBinariesParsec(tr, envs, cmd, failures)
}
return fmt.Errorf("unknown architecture: [%s]", tr.Architecture)
}
2 changes: 1 addition & 1 deletion coordinator/testruns/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (t *TestRunManager) ExecuteTestRun(tr *common.TestRun) {
}
}

if !t.IsPhaseTwo(tr.Architecture) {
if !t.IsParsec(tr.Architecture) {
// Generate the configuration file the system needs based on the
// configured
// parameters in the UI
Expand Down
2 changes: 1 addition & 1 deletion coordinator/testruns/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (t *TestRunManager) SubstituteParameters(
fmt.Sprintf("%d", tr.LoadGenAccounts),
)
p = strings.ReplaceAll(p, "%LOGLEVEL%", t.RoleLogLevel(tr, r))
if r.Role == common.SystemRolePhaseTwoGen {
if r.Role == common.SystemRoleParsecGen {
p = strings.ReplaceAll(p, "%LGAFFINITY%", t.LoadGenAffinity(tr, r))
}
newParams = append(newParams, p)
Expand Down
4 changes: 2 additions & 2 deletions coordinator/testruns/roleconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (t *TestRunManager) writeTestRunConfigVariables(
numRoles := t.countRoles(tr)
numClis := 0
numClis += numRoles[common.SystemRoleAtomizerCliWatchtower]
numClis += numRoles[common.SystemRolePhaseTwoGen]
numClis += numRoles[common.SystemRoleParsecGen]
numClis += numRoles[common.SystemRoleTwoPhaseGen]

// Calculate target per role
Expand Down Expand Up @@ -355,7 +355,7 @@ func (t *TestRunManager) RoleTelLevel(
tellevel = tr.WatchtowerTelemetryLevel
case common.SystemRoleCoordinator:
tellevel = tr.CoordinatorTelemetryLevel
case common.SystemRolePhaseTwoGen:
case common.SystemRoleParsecGen:
fallthrough
case common.SystemRoleTwoPhaseGen:
fallthrough
Expand Down
2 changes: 1 addition & 1 deletion coordinator/testruns/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (t *TestRunManager) NormalizeRole(
role = common.SystemRoleShard
} else if role == common.SystemRoleSentinelTwoPhase {
role = common.SystemRoleSentinel
} else if role == common.SystemRoleAtomizerCliWatchtower || role == common.SystemRolePhaseTwoGen || role == common.SystemRoleTwoPhaseGen {
} else if role == common.SystemRoleAtomizerCliWatchtower || role == common.SystemRoleParsecGen || role == common.SystemRoleTwoPhaseGen {
role = common.SystemRoleLoadGen
}
return role
Expand Down
Loading