Skip to content

Commit

Permalink
Add sales and maximum prices report
Browse files Browse the repository at this point in the history
  • Loading branch information
indigane committed Feb 6, 2025
1 parent 1082817 commit 8a0a9b7
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 2 deletions.
234 changes: 232 additions & 2 deletions backend/hitas/services/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from statistics import mean
from typing import Any, Callable, Iterable, Literal, NamedTuple, TypeAlias, TypedDict, TypeVar, Union

from openpyxl.styles import Alignment, Border, Side
from openpyxl.styles import Alignment, Border, Font, Side
from openpyxl.workbook import Workbook
from openpyxl.worksheet.worksheet import Worksheet
from rest_framework.exceptions import ValidationError
Expand All @@ -20,6 +20,7 @@
HousingCompanyWithUnregulatedReportAnnotations,
RegulationStatus,
)
from hitas.models.indices import SurfaceAreaPriceCeiling
from hitas.models.ownership import Ownership, OwnershipWithApartmentCount
from hitas.utils import format_sheet, resize_columns

Expand All @@ -39,6 +40,18 @@ class SalesReportColumns(NamedTuple):
total_price_per_square_meter: Decimal | str


class SalesAndMaximumPricesReportColumns(NamedTuple):
cost_area: int | str
postal_code: str
apartment_address: str
surface_area_square_meter: Decimal | str
purchase_date: datetime.date | str
total_price: Decimal | str
total_price_per_square_meter: Decimal | str
maximum_price: Decimal | str
maximum_price_per_square_meter: Decimal | str


class RegulatedHousingCompaniesReportColumns(NamedTuple):
cost_area: int | str
postal_code: str
Expand Down Expand Up @@ -169,7 +182,7 @@ def build_sales_report_excel(sales: list[ApartmentSale]) -> Workbook:
raise ValidationError(
detail={
api_settings.NON_FIELD_ERRORS_KEY: (
f"Surface are zero or missing for apartment {sale.apartment.address!r}. "
f"Surface area zero or missing for apartment {sale.apartment.address!r}. "
f"Cannot calculate price per square meter."
)
},
Expand Down Expand Up @@ -319,6 +332,223 @@ def conditional_range(value_range: str, **comparison_ranges_to_values: Any) -> l
return workbook


def build_sales_and_maximum_prices_report_excel(sales: list[ApartmentSale]) -> Workbook:
workbook = Workbook()
worksheet: Worksheet = workbook.active

column_headers = SalesAndMaximumPricesReportColumns(
cost_area="Kalleusalue",
postal_code="Postinumero",
apartment_address="Osoite",
surface_area_square_meter="m2",
purchase_date="Kauppapäivä",
total_price="Toteutunut kauppahinta",
total_price_per_square_meter="Kaupan neliöhinta",
maximum_price="Enimmäishinta",
maximum_price_per_square_meter="Enimmäishinnan neiöhinta",
)
worksheet.append(column_headers)

# Prefetch surface area price ceilings
surface_area_price_ceilings = {}
for month_obj, value in SurfaceAreaPriceCeiling.objects.all().values_list("month", "value"):
surface_area_price_ceilings[(month_obj.year, month_obj.month)] = value

for sale in sales:
if not sale.apartment.surface_area:
raise ValidationError(
detail={
api_settings.NON_FIELD_ERRORS_KEY: (
f"Surface area zero or missing for apartment {sale.apartment.address!r}. "
f"Cannot calculate price per square meter."
)
},
)

# Pick calculation closest to the purchase date (latest calculation)
# but only if it is valid on the purchase date.
maximum_price_calculation = (
sale.apartment.max_price_calculations.filter(
calculation_date__lte=sale.purchase_date,
valid_until__gte=sale.purchase_date,
)
.order_by("-calculation_date")
.first()
)

maximum_price = None
is_maximum_price_fallback = False
if maximum_price_calculation is not None:
maximum_price = maximum_price_calculation.maximum_price
else:
# Fall back to surface area price ceiling
surface_area_price_ceiling = surface_area_price_ceilings.get(
(sale.purchase_date.year, sale.purchase_date.month),
)
if surface_area_price_ceiling is not None:
maximum_price = sale.apartment.surface_area * surface_area_price_ceiling
is_maximum_price_fallback = True

worksheet.append(
SalesAndMaximumPricesReportColumns(
cost_area=sale.apartment.postal_code.cost_area,
postal_code=sale.apartment.postal_code.value,
apartment_address=sale.apartment.address,
surface_area_square_meter=sale.apartment.surface_area,
purchase_date=sale.purchase_date,
total_price=sale.total_price,
total_price_per_square_meter=sale.total_price / sale.apartment.surface_area,
maximum_price=maximum_price if maximum_price is not None else "",
maximum_price_per_square_meter=(
(maximum_price / sale.apartment.surface_area) if maximum_price is not None else ""
),
)
)

if is_maximum_price_fallback:
maximum_price_cell = worksheet.cell(row=worksheet.max_row, column=8)
maximum_price_per_square_meter_cell = worksheet.cell(row=worksheet.max_row, column=9)
maximum_price_cell.font = Font(italic=True)
maximum_price_per_square_meter_cell.font = Font(italic=True)

last_row = worksheet.max_row
worksheet.auto_filter.ref = worksheet.dimensions

empty_row = SalesAndMaximumPricesReportColumns(
cost_area="",
postal_code="",
apartment_address="",
surface_area_square_meter="",
purchase_date="",
total_price="",
total_price_per_square_meter="",
maximum_price="",
maximum_price_per_square_meter="",
)

# There needs to be an empty row for sorting and filtering to work properly
worksheet.append(empty_row)

@cache
def unwrap_range(cell_range: str) -> list[Any]:
return [cell.value for row in worksheet[cell_range] for cell in row]

@cache
def conditional_range(value_range: str, **comparison_ranges_to_values: Any) -> list[Any]:
"""
Returns values from `value_range` where the given comparison ranges
contain all values as indicated by the mapping.
"""
comparison_values: list[Any] = list(comparison_ranges_to_values.values())
unwrapped_comparison_ranges = zip(*(unwrap_range(rang) for rang in comparison_ranges_to_values), strict=False)
zipped_ranges = zip(unwrap_range(value_range), unwrapped_comparison_ranges, strict=True)
return [
value
for value, range_values in zipped_ranges
if all(range_value == comparison_values[i] for i, range_value in enumerate(range_values))
]

summary_start = worksheet.max_row + 1

summary_rows: list[SalesReportSummaryDefinition] = [
SalesReportSummaryDefinition(
title="Kaikki kaupat",
subtitle="Lukumäärä",
func=lambda x: len(unwrap_range(x)),
),
SalesReportSummaryDefinition(
subtitle="Keskiarvo",
func=lambda x: mean(unwrap_range(x) or [0]),
),
SalesReportSummaryDefinition(
subtitle="Maksimi",
func=lambda x: max(unwrap_range(x), default=0),
),
SalesReportSummaryDefinition(
subtitle="Minimi",
func=lambda x: min(unwrap_range(x), default=0),
),
]

for cost_area in range(1, 5):
summary_rows += [
None, # empty row
SalesReportSummaryDefinition(
title=f"Kalleusalue {cost_area}",
subtitle="Lukumäärä",
func=lambda x, y=cost_area: len(conditional_range(x, **{f"A2:A{last_row}": y})),
),
SalesReportSummaryDefinition(
subtitle="Keskiarvo",
func=lambda x, y=cost_area: mean(conditional_range(x, **{f"A2:A{last_row}": y}) or [0]),
),
SalesReportSummaryDefinition(
subtitle="Maksimi",
func=lambda x, y=cost_area: max(conditional_range(x, **{f"A2:A{last_row}": y}), default=0),
),
SalesReportSummaryDefinition(
subtitle="Minimi",
func=lambda x, y=cost_area: min(conditional_range(x, **{f"A2:A{last_row}": y}), default=0),
),
]

sales_count_rows: list[int] = []

for definition in summary_rows:
if definition is None:
worksheet.append(empty_row)
continue

if definition.subtitle == "Lukumäärä":
sales_count_rows.append(worksheet.max_row + 1)

worksheet.append(
SalesAndMaximumPricesReportColumns(
cost_area="",
postal_code="",
apartment_address="",
surface_area_square_meter=definition.title,
purchase_date=definition.subtitle,
total_price=definition.func(f"F2:F{last_row}"),
total_price_per_square_meter=definition.func(f"G2:G{last_row}"),
maximum_price="",
maximum_price_per_square_meter="",
),
)

euro_format = "#,##0\\ \\€"
euro_per_square_meter_format = "#,##0.00\\ \\\\/\\m²"
date_format = "DD.MM.YYYY"
column_letters = string.ascii_uppercase[: len(column_headers)]

format_sheet(
worksheet,
formatting_rules={
# Add a border to the header row
**{f"{letter}1": {"border": Border(bottom=Side(style="thin"))} for letter in column_letters},
# Add a border to the last data row
**{f"{letter}{last_row}": {"border": Border(bottom=Side(style="thin"))} for letter in column_letters},
# Align the summary titles to the right
**{
f"E{summary_start + i}": {"alignment": Alignment(horizontal="right")}
for i in range(0, len(summary_rows))
},
"B": {"alignment": Alignment(horizontal="right")},
"E": {"number_format": date_format},
"F": {"number_format": euro_format},
"G": {"number_format": euro_per_square_meter_format},
"H": {"number_format": euro_format},
"I": {"number_format": euro_per_square_meter_format},
# Reset number format for sales count cells
**{f"{letter}{row}": {"number_format": "General"} for row in sales_count_rows for letter in "FG"},
},
)

resize_columns(worksheet)
worksheet.protection.sheet = True
return workbook


def build_regulated_housing_companies_report_excel(
housing_companies: list[HousingCompanyWithRegulatedReportAnnotations],
) -> Workbook:
Expand Down
109 changes: 109 additions & 0 deletions backend/hitas/tests/apis/test_api_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
OwnerFactory,
OwnershipFactory,
)
from hitas.tests.factories.apartment import ApartmentMaximumPriceCalculationFactory
from hitas.tests.factories.indices import SurfaceAreaPriceCeilingFactory

# Sales report

Expand Down Expand Up @@ -423,6 +425,113 @@ def test__api__sales_report__surface_area_missing(api_client: HitasAPIClient):
}


@pytest.mark.django_db
def test__api__sales_and_maximum_prices_report(api_client: HitasAPIClient):
# Create sales in the report interval
sale_1: ApartmentSale = ApartmentSaleFactory.create(
purchase_date=datetime.date(2020, 1, 1),
purchase_price=50_000,
apartment_share_of_housing_company_loans=10_000,
apartment__surface_area=100,
apartment__building__real_estate__housing_company__postal_code__value="00001",
apartment__building__real_estate__housing_company__postal_code__cost_area=1,
)
sale_2: ApartmentSale = ApartmentSaleFactory.create(
purchase_date=datetime.date(2020, 2, 15),
purchase_price=130_000,
apartment_share_of_housing_company_loans=100,
apartment__surface_area=100,
apartment__building__real_estate__housing_company__postal_code__value="00002",
apartment__building__real_estate__housing_company__postal_code__cost_area=1,
)
# Valid calculation
ApartmentMaximumPriceCalculationFactory.create(
apartment=sale_1.apartment,
calculation_date=datetime.date(2019, 1, 1),
valid_until=datetime.date(2022, 1, 1),
maximum_price=150_000,
)
# Valid calculation but older
ApartmentMaximumPriceCalculationFactory.create(
apartment=sale_1.apartment,
calculation_date=datetime.date(2018, 1, 1),
valid_until=datetime.date(2021, 1, 1),
maximum_price=140_000,
)
# Invalid calculation, too old
ApartmentMaximumPriceCalculationFactory.create(
apartment=sale_1.apartment,
calculation_date=datetime.date(2018, 1, 1),
valid_until=datetime.date(2019, 1, 1),
maximum_price=120_000,
)
# Invalid calculation, too new
ApartmentMaximumPriceCalculationFactory.create(
apartment=sale_1.apartment,
calculation_date=datetime.date(2018, 1, 1),
valid_until=datetime.date(2019, 1, 1),
maximum_price=160_000,
)
# Sale 2 should fall back to using surface area price ceiling when no calculation is found
SurfaceAreaPriceCeilingFactory.create(month=datetime.date(2020, 2, 1), value=2000)

data = {
"start_date": "2020-01-01",
"end_date": "2020-02-28",
}
url = reverse("hitas:sales-and-maximum-prices-report-list") + "?" + urlencode(data)
response: HttpResponse = api_client.get(url)

assert response.status_code == status.HTTP_200_OK

workbook: Workbook = load_workbook(BytesIO(response.content), data_only=False)
worksheet: Worksheet = workbook.worksheets[0]

rows = list(worksheet.values)
assert len(rows[0][0]) > 0, "Row 1 column 1 should have a title"
assert len(rows[0][1]) > 0, "Row 1 column 2 should have a title"
assert len(rows[0][2]) > 0, "Row 1 column 3 should have a title"
assert len(rows[0][3]) > 0, "Row 1 column 4 should have a title"
assert len(rows[0][4]) > 0, "Row 1 column 5 should have a title"
assert len(rows[0][5]) > 0, "Row 1 column 6 should have a title"
assert len(rows[0][6]) > 0, "Row 1 column 7 should have a title"
# Sale row 1
assert rows[1][0] == sale_1.apartment.postal_code.cost_area
assert rows[1][1] == sale_1.apartment.postal_code.value
assert rows[1][2] == sale_1.apartment.address
assert rows[1][3] == sale_1.apartment.surface_area
assert rows[1][4] == datetime.datetime.fromisoformat(sale_1.purchase_date.isoformat())
assert rows[1][5] == 60_000 # 50_000 € + 10_000 €
assert rows[1][6] == 600 # (50_000 € + 10_000 €) / 100 m²
assert rows[1][7] == 150_000
assert rows[1][8] == 1_500 # 150_000 € / 100 m²
# Sale row 2
assert rows[2][0] == sale_2.apartment.postal_code.cost_area
assert rows[2][1] == sale_2.apartment.postal_code.value
assert rows[2][2] == sale_2.apartment.address
assert rows[2][3] == sale_2.apartment.surface_area
assert rows[2][4] == datetime.datetime.fromisoformat(sale_2.purchase_date.isoformat())
assert rows[2][5] == 130_100
assert rows[2][6] == 1_301 # (130_000 + 100) / 100 m²
assert rows[2][7] == 200_000 # 2000 € * 100 m²
assert rows[2][8] == 2_000
# Totals - all sales
assert rows[4][5] == 2, "Total amount of sales should be 2"
assert rows[5][5] == 95_050, "Mean should be 95_050"
assert rows[6][5] == 130_100, "Maximum should be 130_100"
assert rows[7][5] == 60_000, "Minimum should be 60_000"
# Totals - cost area 1
assert rows[9][5] == 2, "Total amount of sales in cost area 1 should be 2"
assert rows[10][5] == 95_050, "Mean should be 95_050"
assert rows[11][5] == 130_100, "Maximum should be 130_100"
assert rows[12][5] == 60_000, "Minimum should be 60_000"
# Totals - cost area 2
assert rows[14][5] == 0, "Total amount of sales in cost area 2 should be 0"
assert rows[15][5] == 0, "Mean should be 0"
assert rows[16][5] == 0, "Maximum should be 0"
assert rows[17][5] == 0, "Minimum should be 0"


# Regulated housing companies report


Expand Down
Loading

0 comments on commit 8a0a9b7

Please sign in to comment.