Skip to content

Commit

Permalink
Merge pull request #8 from nikosT/devel
Browse files Browse the repository at this point in the history
 New allocation policies introduced
  • Loading branch information
nikosT authored Apr 6, 2024
2 parents e993adc + 0b5af30 commit 933abeb
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 5 deletions.
2 changes: 1 addition & 1 deletion etc/oar/admission_rules.d/15_check_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
r8 = "^allowed=\\w+$"
r9 = "^inner=\\w+$"
r10 = "^timesharing=(?:(?:\\*|user),(?:\\*|name)|(?:\\*|name),(?:\\*|user))$"
r11 = "^(?:compact|spread|r_spread|no_pref|ml)$"
r11 = "^(?:compact|f_compact|spread|f_spread|co_loc|f_co_loc|no_pref|ml)$"
all_re = re.compile(
"(%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s)"
% (r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11)
Expand Down
121 changes: 121 additions & 0 deletions etc/oar/admission_rules.d/26_co_loc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
def estimate_job_nb_resources_in_spread(
session, config, resource_request, j_properties
):
"""returns an array with an estimation of the number of resources that can be used by a job:
(resources_available, [(nbresources => int, walltime => int)])
"""
from sqlalchemy import exc

# estimate_job_nb_resources
estimated_nb_resources = []
is_resource_available = False
resource_set = ResourceSet(session, config)
resources_itvs = resource_set.roid_itvs

for mld_idx, mld_resource_request in enumerate(resource_request):
resource_desc, walltime = mld_resource_request

if not walltime:
walltime = str(config["DEFAULT_JOB_WALLTIME"])

estimated_nb_res = 0

for prop_res in resource_desc:
jrg_grp_property = prop_res["property"]
resource_value_lst = prop_res["resources"]

#
# determine resource constraints
#
if (not j_properties) and (
not jrg_grp_property or (jrg_grp_property == "type='default'")
): # TODO change to re.match
# copy itvs
constraints = copy.copy(resource_set.roid_itvs)
else:
and_sql = ""
if j_properties and jrg_grp_property:
and_sql = " AND "
if j_properties is None:
j_properties = ""
if jrg_grp_property is None:
jrg_grp_property = ""

sql_constraints = j_properties + and_sql + jrg_grp_property

try:
request_constraints = (
session.query(Resource.id).filter(text(sql_constraints)).all()
)
except exc.SQLAlchemyError:
error_code = -5
error_msg = (
"Bad resource SQL constraints request:"
+ sql_constraints
+ "\n"
+ "SQLAlchemyError: "
+ str(exc)
)
error = (error_code, error_msg)
return (error, None, None)

roids = [resource_set.rid_i2o[int(y[0])] for y in request_constraints]
constraints = ProcSet(*roids)

hy_levels = []
hy_nbs = []
for resource_value in resource_value_lst:
res_name = resource_value["resource"]
if res_name not in resource_set.hierarchy:
possible_options = ", ".join(resource_set.hierarchy.keys())
error_code = -3
error_msg = (
f"Bad resources name: {res_name} is not a valid resources name."
f"Valid resource names are: {possible_options}"
)
error = (error_code, error_msg)
return (error, None, None)

value = resource_value["value"]
hy_levels.append(resource_set.hierarchy[res_name])
hy_nbs.append(int(value))

cts_resources_itvs = constraints & resources_itvs

for soc in resource_set.hierarchy["cpu"]:
avail_cores = soc & cts_resources_itvs
cts_resources_itvs -= ProcSet(
*avail_cores[int(len(soc) / 2) : len(soc)]
)

res_itvs = find_resource_hierarchies_scattered(
cts_resources_itvs, hy_levels, hy_nbs
)
if res_itvs:
estimated_nb_res += len(res_itvs)
# break

if estimated_nb_res > 0:
is_resource_available = True

estimated_nb_resources.append((estimated_nb_res, walltime))

if not is_resource_available:
error = (-5, "There are not enough resources for your request")
return (error, None, None)

return ((0, ""), is_resource_available, estimated_nb_resources)


if "co_loc" in types:
types = list(map(lambda t: t.replace("co_loc", "find=co_loc"), types))

if (
estimate_job_nb_resources_in_spread(
session, config, resource_request, properties
)[0][0]
< 0
):
raise Exception(
"# ADMISSION RULE> There are not enough resources for your request using the spread method"
)
2 changes: 2 additions & 0 deletions etc/oar/admission_rules.d/27_f_compact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
if "f_compact" in types:
types = list(map(lambda t: t.replace("f_compact", "find=f_compact"), types))
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def estimate_job_nb_resources_in_spread(
return ((0, ""), is_resource_available, estimated_nb_resources)


if "r_spread" in types:
types = list(map(lambda t: t.replace("r_spread", "find=r_spread"), types))
if "f_spread" in types:
types = list(map(lambda t: t.replace("f_spread", "find=f_spread"), types))

if (
estimate_job_nb_resources_in_spread(
Expand Down
121 changes: 121 additions & 0 deletions etc/oar/admission_rules.d/29_f_co_loc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
def estimate_job_nb_resources_in_spread(
session, config, resource_request, j_properties
):
"""returns an array with an estimation of the number of resources that can be used by a job:
(resources_available, [(nbresources => int, walltime => int)])
"""
from sqlalchemy import exc

# estimate_job_nb_resources
estimated_nb_resources = []
is_resource_available = False
resource_set = ResourceSet(session, config)
resources_itvs = resource_set.roid_itvs

for mld_idx, mld_resource_request in enumerate(resource_request):
resource_desc, walltime = mld_resource_request

if not walltime:
walltime = str(config["DEFAULT_JOB_WALLTIME"])

estimated_nb_res = 0

for prop_res in resource_desc:
jrg_grp_property = prop_res["property"]
resource_value_lst = prop_res["resources"]

#
# determine resource constraints
#
if (not j_properties) and (
not jrg_grp_property or (jrg_grp_property == "type='default'")
): # TODO change to re.match
# copy itvs
constraints = copy.copy(resource_set.roid_itvs)
else:
and_sql = ""
if j_properties and jrg_grp_property:
and_sql = " AND "
if j_properties is None:
j_properties = ""
if jrg_grp_property is None:
jrg_grp_property = ""

sql_constraints = j_properties + and_sql + jrg_grp_property

try:
request_constraints = (
session.query(Resource.id).filter(text(sql_constraints)).all()
)
except exc.SQLAlchemyError:
error_code = -5
error_msg = (
"Bad resource SQL constraints request:"
+ sql_constraints
+ "\n"
+ "SQLAlchemyError: "
+ str(exc)
)
error = (error_code, error_msg)
return (error, None, None)

roids = [resource_set.rid_i2o[int(y[0])] for y in request_constraints]
constraints = ProcSet(*roids)

hy_levels = []
hy_nbs = []
for resource_value in resource_value_lst:
res_name = resource_value["resource"]
if res_name not in resource_set.hierarchy:
possible_options = ", ".join(resource_set.hierarchy.keys())
error_code = -3
error_msg = (
f"Bad resources name: {res_name} is not a valid resources name."
f"Valid resource names are: {possible_options}"
)
error = (error_code, error_msg)
return (error, None, None)

value = resource_value["value"]
hy_levels.append(resource_set.hierarchy[res_name])
hy_nbs.append(int(value))

cts_resources_itvs = constraints & resources_itvs

for soc in resource_set.hierarchy["cpu"]:
avail_cores = soc & cts_resources_itvs
cts_resources_itvs -= ProcSet(
*avail_cores[int(len(soc) / 2) : len(soc)]
)

res_itvs = find_resource_hierarchies_scattered(
cts_resources_itvs, hy_levels, hy_nbs
)
if res_itvs:
estimated_nb_res += len(res_itvs)
# break

if estimated_nb_res > 0:
is_resource_available = True

estimated_nb_resources.append((estimated_nb_res, walltime))

if not is_resource_available:
error = (-5, "There are not enough resources for your request")
return (error, None, None)

return ((0, ""), is_resource_available, estimated_nb_resources)


if "f_co_loc" in types:
types = list(map(lambda t: t.replace("f_co_loc", "find=f_co_loc"), types))

if (
estimate_job_nb_resources_in_spread(
session, config, resource_request, properties
)[0][0]
< 0
):
raise Exception(
"# ADMISSION RULE> There are not enough resources for your request using the spread method"
)
71 changes: 70 additions & 1 deletion oar/kao/custom_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False):
return result


def r_spread(itvs_slots, hy_res_rqts, hy, beginning_slotset):
def co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
Expand Down Expand Up @@ -162,3 +162,72 @@ def no_pref(itvs_slots, hy_res_rqts, hy, beginning_slotset):
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
return compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False)


def f_compact(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
.. note::
This` can be override with the oar `extension <../admin/extensions.html#functions-assign-and-find>`_ mechanism.
:param itvs_slots: A procset of the resources available for the allocation
:type itvs_slots: :class:`procset.ProcSet`
:param hy_res_rqts: The job's request
:param hy: The definition of the resources hierarchy
:return [ProcSet]: \
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
avail_procset = compact(
itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse
)

# if no allocation space is found (by compact policy)
# fallback to spread policy
if len(avail_procset) == 0:
return spread(itvs_slots, hy_res_rqts, hy, beginning_slotset)
else:
return avail_procset


def f_spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=False):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
.. note::
This` can be override with the oar `extension <../admin/extensions.html#functions-assign-and-find>`_ mechanism.
:param itvs_slots: A procset of the resources available for the allocation
:type itvs_slots: :class:`procset.ProcSet`
:param hy_res_rqts: The job's request
:param hy: The definition of the resources hierarchy
:return [ProcSet]: \
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
avail_procset = spread(
itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=reverse
)

# if no allocation space is found (by compact policy)
# fallback to spread
if len(avail_procset) == 0:
return compact(itvs_slots, hy_res_rqts, hy, beginning_slotset)
else:
return avail_procset


def f_co_loc(itvs_slots, hy_res_rqts, hy, beginning_slotset):
"""
Given a job resource request and a set of resources this function tries to find a matching allocation.
.. note::
This` can be override with the oar `extension <../admin/extensions.html#functions-assign-and-find>`_ mechanism.
:param itvs_slots: A procset of the resources available for the allocation
:type itvs_slots: :class:`procset.ProcSet`
:param hy_res_rqts: The job's request
:param hy: The definition of the resources hierarchy
:return [ProcSet]: \
The allocation if found, otherwise an empty :class:`procset.ProcSet`
"""
return f_spread(itvs_slots, hy_res_rqts, hy, beginning_slotset, reverse=True)
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,8 @@ build-backend = "poetry.masonry.api"
[tool.poetry.plugins."oar.find_func"]
compact = "oar.kao.custom_scheduling:compact"
spread = "oar.kao.custom_scheduling:spread"
r_spread = "oar.kao.custom_scheduling:r_spread"
co_loc = "oar.kao.custom_scheduling:co_loc"
f_compact = "oar.kao.custom_scheduling:f_compact"
f_spread = "oar.kao.custom_scheduling:f_spread"
f_co_loc = "oar.kao.custom_scheduling:f_co_loc"
no_pref = "oar.kao.custom_scheduling:no_pref"

0 comments on commit 933abeb

Please sign in to comment.