Skip to content

Commit

Permalink
Add sales and maximum prices report
Browse files Browse the repository at this point in the history
  • Loading branch information
indigane committed Feb 6, 2025
1 parent 1082817 commit 5693dd1
Show file tree
Hide file tree
Showing 9 changed files with 423 additions and 4 deletions.
234 changes: 232 additions & 2 deletions backend/hitas/services/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from statistics import mean
from typing import Any, Callable, Iterable, Literal, NamedTuple, TypeAlias, TypedDict, TypeVar, Union

from openpyxl.styles import Alignment, Border, Side
from openpyxl.styles import Alignment, Border, Font, Side
from openpyxl.workbook import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from rest_framework.exceptions import ValidationError
Expand All @@ -20,6 +20,7 @@
HousingCompanyWithUnregulatedReportAnnotations,
RegulationStatus,
)
from hitas.models.indices import SurfaceAreaPriceCeiling
from hitas.models.ownership import Ownership, OwnershipWithApartmentCount
from hitas.utils import format_sheet, resize_columns

Expand All @@ -39,6 +40,18 @@ class SalesReportColumns(NamedTuple):
total_price_per_square_meter: Decimal | str


class SalesAndMaximumPricesReportColumns(NamedTuple):
cost_area: int | str
postal_code: str
apartment_address: str
surface_area_square_meter: Decimal | str
purchase_date: datetime.date | str
total_price: Decimal | str
total_price_per_square_meter: Decimal | str
maximum_price: Decimal | str
maximum_price_per_square_meter: Decimal | str


class RegulatedHousingCompaniesReportColumns(NamedTuple):
cost_area: int | str
postal_code: str
Expand Down Expand Up @@ -169,7 +182,7 @@ def build_sales_report_excel(sales: list[ApartmentSale]) -> Workbook:
raise ValidationError(
detail={
api_settings.NON_FIELD_ERRORS_KEY: (
f"Surface are zero or missing for apartment {sale.apartment.address!r}. "
f"Surface area zero or missing for apartment {sale.apartment.address!r}. "
f"Cannot calculate price per square meter."
)
},
Expand Down Expand Up @@ -319,6 +332,223 @@ def conditional_range(value_range: str, **comparison_ranges_to_values: Any) -> l
return workbook


def build_sales_and_maximum_prices_report_excel(sales: list[ApartmentSale]) -> Workbook:
workbook = Workbook()
worksheet: Worksheet = workbook.active

column_headers = SalesAndMaximumPricesReportColumns(
cost_area="Kalleusalue",
postal_code="Postinumero",
apartment_address="Osoite",
surface_area_square_meter="m2",
purchase_date="Kauppapäivä",
total_price="Toteutunut kauppahinta",
total_price_per_square_meter="Kaupan neliöhinta",
maximum_price="Enimmäishinta",
maximum_price_per_square_meter="Enimmäishinnan neiöhinta",
)
worksheet.append(column_headers)

# Prefetch surface area price ceilings
surface_area_price_ceilings = {}
for month_obj, value in SurfaceAreaPriceCeiling.objects.all().values_list("month", "value"):
surface_area_price_ceilings[(month_obj.year, month_obj.month)] = value

for sale in sales:
if not sale.apartment.surface_area:
raise ValidationError(
detail={
api_settings.NON_FIELD_ERRORS_KEY: (
f"Surface area zero or missing for apartment {sale.apartment.address!r}. "
f"Cannot calculate price per square meter."
)
},
)

# Pick calculation closest to the purchase date (latest calculation)
# but only if it is valid on the purchase date.
maximum_price_calculation = (
sale.apartment.max_price_calculations.filter(
calculation_date__lte=sale.purchase_date,
valid_until__gte=sale.purchase_date,
)
.order_by("-calculation_date")
.first()
)

maximum_price = None
is_maximum_price_fallback = False
if maximum_price_calculation is not None:
maximum_price = maximum_price_calculation.maximum_price
else:
# Fall back to surface area price ceiling
surface_area_price_ceiling = surface_area_price_ceilings.get(
(sale.purchase_date.year, sale.purchase_date.month),
)
if surface_area_price_ceiling is not None:
maximum_price = sale.apartment.surface_area * surface_area_price_ceiling
is_maximum_price_fallback = True

worksheet.append(
SalesAndMaximumPricesReportColumns(
cost_area=sale.apartment.postal_code.cost_area,
postal_code=sale.apartment.postal_code.value,
apartment_address=sale.apartment.address,
surface_area_square_meter=sale.apartment.surface_area,
purchase_date=sale.purchase_date,
total_price=sale.total_price,
total_price_per_square_meter=sale.total_price / sale.apartment.surface_area,
maximum_price=maximum_price if maximum_price is not None else "",
maximum_price_per_square_meter=(
(maximum_price / sale.apartment.surface_area) if maximum_price is not None else ""
),
)
)

if is_maximum_price_fallback:
maximum_price_cell = worksheet.cell(row=worksheet.max_row, column=8)
maximum_price_per_square_meter_cell = worksheet.cell(row=worksheet.max_row, column=9)
maximum_price_cell.font = Font(italic=True)
maximum_price_per_square_meter_cell.font = Font(italic=True)

last_row = worksheet.max_row
worksheet.auto_filter.ref = worksheet.dimensions

empty_row = SalesAndMaximumPricesReportColumns(
cost_area="",
postal_code="",
apartment_address="",
surface_area_square_meter="",
purchase_date="",
total_price="",
total_price_per_square_meter="",
maximum_price="",
maximum_price_per_square_meter="",
)

# There needs to be an empty row for sorting and filtering to work properly
worksheet.append(empty_row)

@cache
def unwrap_range(cell_range: str) -> list[Any]:
return [cell.value for row in worksheet[cell_range] for cell in row]

@cache
def conditional_range(value_range: str, **comparison_ranges_to_values: Any) -> list[Any]:
"""
Returns values from `value_range` where the given comparison ranges
contain all values as indicated by the mapping.
"""
comparison_values: list[Any] = list(comparison_ranges_to_values.values())
unwrapped_comparison_ranges = zip(*(unwrap_range(rang) for rang in comparison_ranges_to_values), strict=False)
zipped_ranges = zip(unwrap_range(value_range), unwrapped_comparison_ranges, strict=True)
return [
value
for value, range_values in zipped_ranges
if all(range_value == comparison_values[i] for i, range_value in enumerate(range_values))
]

summary_start = worksheet.max_row + 1

summary_rows: list[SalesReportSummaryDefinition] = [
SalesReportSummaryDefinition(
title="Kaikki kaupat",
subtitle="Lukumäärä",
func=lambda x: len(unwrap_range(x)),
),
SalesReportSummaryDefinition(
subtitle="Keskiarvo",
func=lambda x: mean(unwrap_range(x) or [0]),
),
SalesReportSummaryDefinition(
subtitle="Maksimi",
func=lambda x: max(unwrap_range(x), default=0),
),
SalesReportSummaryDefinition(
subtitle="Minimi",
func=lambda x: min(unwrap_range(x), default=0),
),
]

for cost_area in range(1, 5):
summary_rows += [
None, # empty row
SalesReportSummaryDefinition(
title=f"Kalleusalue {cost_area}",
subtitle="Lukumäärä",
func=lambda x, y=cost_area: len(conditional_range(x, **{f"A2:A{last_row}": y})),
),
SalesReportSummaryDefinition(
subtitle="Keskiarvo",
func=lambda x, y=cost_area: mean(conditional_range(x, **{f"A2:A{last_row}": y}) or [0]),
),
SalesReportSummaryDefinition(
subtitle="Maksimi",
func=lambda x, y=cost_area: max(conditional_range(x, **{f"A2:A{last_row}": y}), default=0),
),
SalesReportSummaryDefinition(
subtitle="Minimi",
func=lambda x, y=cost_area: min(conditional_range(x, **{f"A2:A{last_row}": y}), default=0),
),
]

sales_count_rows: list[int] = []

for definition in summary_rows:
if definition is None:
worksheet.append(empty_row)
continue

if definition.subtitle == "Lukumäärä":
sales_count_rows.append(worksheet.max_row + 1)

worksheet.append(
SalesAndMaximumPricesReportColumns(
cost_area="",
postal_code="",
apartment_address="",
surface_area_square_meter=definition.title,
purchase_date=definition.subtitle,
total_price=definition.func(f"F2:F{last_row}"),
total_price_per_square_meter=definition.func(f"G2:G{last_row}"),
maximum_price="",
maximum_price_per_square_meter="",
),
)

euro_format = "#,##0\\ \\€"
euro_per_square_meter_format = "#,##0.00\\ \\\\/\\m²"
date_format = "DD.MM.YYYY"
column_letters = string.ascii_uppercase[: len(column_headers)]

format_sheet(
worksheet,
formatting_rules={
# Add a border to the header row
**{f"{letter}1": {"border": Border(bottom=Side(style="thin"))} for letter in column_letters},
# Add a border to the last data row
**{f"{letter}{last_row}": {"border": Border(bottom=Side(style="thin"))} for letter in column_letters},
# Align the summary titles to the right
**{
f"E{summary_start + i}": {"alignment": Alignment(horizontal="right")}
for i in range(0, len(summary_rows))
},
"B": {"alignment": Alignment(horizontal="right")},
"E": {"number_format": date_format},
"F": {"number_format": euro_format},
"G": {"number_format": euro_per_square_meter_format},
"H": {"number_format": euro_format},
"I": {"number_format": euro_per_square_meter_format},
# Reset number format for sales count cells
**{f"{letter}{row}": {"number_format": "General"} for row in sales_count_rows for letter in "FG"},
},
)

resize_columns(worksheet)
worksheet.protection.sheet = True
return workbook


def build_regulated_housing_companies_report_excel(
housing_companies: list[HousingCompanyWithRegulatedReportAnnotations],
) -> Workbook:
Expand Down
Loading

0 comments on commit 5693dd1

Please sign in to comment.