Skip to content

Commit

Permalink
Merge pull request #192 from investigativedata/develop
Browse files Browse the repository at this point in the history
v0.6.0
  • Loading branch information
simonwoerpel authored Feb 24, 2024
2 parents 1c7c3a0 + 48ef9ee commit 26fd016
Show file tree
Hide file tree
Showing 41 changed files with 2,947 additions and 2,987 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.5.1
current_version = 0.6.0
commit = True
tag = True
message = 🔖 Bump version: {current_version} → {new_version}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ jobs:
- name: set PY
run: echo "PY=$(python -VV | sha256sum | cut -d' ' -f1)" >> $GITHUB_ENV
- name: Set up poetry cache
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: .venv
key: venv-${{ runner.os }}-${{ env.PY }}-${{ hashFiles('**/poetry.lock') }}
- name: Ensure cache is healthy
if: steps.cache.outputs.cache-hit == 'true'
run: poetry run pip --version >/dev/null 2>&1 || rm -rf .venv
- name: Set up pre-commit cache
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/.cache/pre-commit
key: pre-commit-${{ runner.os }}-${{ env.PY }}-${{ hashFiles('.pre-commit-config.yaml') }}
Expand Down
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ repos:
- id: absolufy-imports

- repo: https://github.com/pycqa/isort
rev: 5.12.0
rev: 5.13.2
hooks:
- id: isort
args: ["--profile", "black"]

- repo: https://github.com/psf/black
rev: 23.9.1
rev: 24.2.0
hooks:
- id: black

Expand Down Expand Up @@ -68,7 +68,7 @@ repos:
- id: rst-inline-touching-normal

- repo: https://github.com/python-poetry/poetry
rev: 1.6.0
rev: 1.7.0
hooks:
- id: poetry-check
- id: poetry-lock
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.1
0.6.0
2 changes: 1 addition & 1 deletion ftmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ftmq.query import Query

__version__ = "0.5.1"
__version__ = "0.6.0"
__all__ = ["Query"]
3 changes: 2 additions & 1 deletion ftmq/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from functools import cache
from typing import Any, Generator, Iterable, TypeAlias

from anystore.util import clean_dict
from banal import ensure_list
from followthemoney.schema import Schema
from followthemoney.types import registry
from pydantic import BaseModel

from ftmq.enums import Aggregations, Fields, Properties
from ftmq.types import CE, CEGenerator
from ftmq.util import clean_dict, to_numeric
from ftmq.util import to_numeric

Value: TypeAlias = int | float | str
Values: TypeAlias = list[Value]
Expand Down
84 changes: 41 additions & 43 deletions ftmq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@

import click
import orjson
from anystore.io import smart_write
from anystore.util import clean_dict
from click_default_group import DefaultGroup

from ftmq.aggregate import aggregate
from ftmq.io import (
apply_datasets,
smart_read,
smart_read_proxies,
smart_write,
smart_write_proxies,
)
from ftmq.io import apply_datasets, smart_read_proxies, smart_write_proxies
from ftmq.model.coverage import Collector
from ftmq.model.dataset import Catalog, Dataset
from ftmq.query import Query
from ftmq.store import get_store
from ftmq.util import clean_dict, parse_unknown_filters
from ftmq.util import parse_unknown_filters


@click.group(cls=DefaultGroup, default="q", default_if_no_args=True)
Expand Down Expand Up @@ -52,10 +48,10 @@ def cli() -> None:
show_default=True,
)
@click.option(
"--coverage-uri",
"--stats-uri",
default=None,
show_default=True,
help="If specified, print coverage information to this uri",
help="If specified, print statistic coverage information to this uri",
)
@click.option(
"--store-dataset",
Expand Down Expand Up @@ -88,7 +84,7 @@ def q(
sort: tuple[str] | None = None,
sort_ascending: bool | None = True,
properties: tuple[str] | None = (),
coverage_uri: str | None = None,
stats_uri: str | None = None,
store_dataset: str | None = None,
sum: tuple[str] | None = (),
min: tuple[str] | None = (),
Expand Down Expand Up @@ -132,14 +128,14 @@ def q(
for func, props in aggs.items():
q = q.aggregate(func, *props, groups=groups)
proxies = smart_read_proxies(input_uri, dataset=store_dataset, query=q)
if coverage_uri:
coverage = Collector()
proxies = coverage.apply(proxies)
if stats_uri:
stats = Collector()
proxies = stats.apply(proxies)
smart_write_proxies(output_uri, proxies, serialize=True, dataset=store_dataset)
if coverage_uri:
coverage = coverage.export()
coverage = orjson.dumps(coverage.dict(), option=orjson.OPT_APPEND_NEWLINE)
smart_write(coverage_uri, coverage)
if stats_uri:
stats = stats.export()
stats = orjson.dumps(stats.dict(), option=orjson.OPT_APPEND_NEWLINE)
smart_write(stats_uri, stats)
if q.aggregator:
result = orjson.dumps(
clean_dict(q.aggregator.result), option=orjson.OPT_APPEND_NEWLINE
Expand Down Expand Up @@ -172,6 +168,23 @@ def apply(
smart_write_proxies(output_uri, proxies, serialize=True)


@cli.group()
def dataset():
pass


@dataset.command("iterate")
@click.option(
"-i", "--input-uri", default="-", show_default=True, help="input file or uri"
)
@click.option(
"-o", "--output-uri", default="-", show_default=True, help="output file or uri"
)
def dataset_iterate(input_uri: str | None = "-", output_uri: str | None = "-"):
dataset = Dataset._from_uri(input_uri)
smart_write_proxies(output_uri, dataset.iterate(), serialize=True)


@cli.group()
def catalog():
pass
Expand All @@ -185,7 +198,7 @@ def catalog():
"-o", "--output-uri", default="-", show_default=True, help="output file or uri"
)
def catalog_iterate(input_uri: str | None = "-", output_uri: str | None = "-"):
catalog = Catalog.from_uri(input_uri)
catalog = Catalog._from_uri(input_uri)
smart_write_proxies(output_uri, catalog.iterate(), serialize=True)


Expand Down Expand Up @@ -269,41 +282,26 @@ def store_iterate(
"-o", "--output-uri", default="-", show_default=True, help="output file or uri"
)
@click.option(
"--coverage",
"--stats",
is_flag=True,
default=False,
show_default=True,
help="Calculate coverage",
help="Calculate stats",
)
def make_dataset(
input_uri: str | None = "-",
output_uri: str | None = "-",
coverage: bool | None = False,
stats: bool | None = False,
):
"""
Convert dataset YAML specification into json
Convert dataset YAML specification into json and optionally calculate statistics
"""
dataset = Dataset.from_string(smart_read(input_uri))
if coverage:
dataset = Dataset._from_uri(input_uri)
if stats:
collector = Collector()
for proxy in dataset.iterate():
collector.collect(proxy)
dataset.coverage = collector.export()
smart_write(output_uri, orjson.dumps(dataset.dict()))


@cli.command("io")
@click.option(
"-i", "--input-uri", default="-", show_default=True, help="input file or uri"
)
@click.option(
"-o", "--output-uri", default="-", show_default=True, help="output file or uri"
)
def io(input_uri: str | None = "-", output_uri: str | None = "-"):
"""
Generic cli wrapper around ftmq.io.smart_open
"""
smart_write(output_uri, smart_read(input_uri))
statistics = collector.collect_many(dataset.iterate())
dataset.apply_stats(statistics)
smart_write(output_uri, dataset.model_dump_json().encode())


@cli.command("aggregate")
Expand Down
5 changes: 2 additions & 3 deletions ftmq/dedupe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from functools import cache

from anystore import smart_stream
from nomenklatura.entity import CompositeEntity
from nomenklatura.resolver import Edge, Resolver

Expand All @@ -9,12 +10,10 @@

@cache
def get_resolver(uri: str | None = None) -> Resolver[CompositeEntity]:
from ftmq.io import smart_read

resolver = Resolver()
if not uri:
return resolver
for ix, edge in enumerate(smart_read(uri, stream=True)):
for ix, edge in enumerate(smart_stream(uri)):
edge = Edge.from_line(edge)
resolver._register(edge)
if ix and ix % 10_000 == 0:
Expand Down
9 changes: 6 additions & 3 deletions ftmq/enums.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections.abc import Iterable
from enum import Enum, EnumMeta
from typing import Any
from typing import Any, Iterable

from followthemoney import model
from nomenklatura.dataset.coverage import DataCoverage
Expand Down Expand Up @@ -43,6 +42,10 @@ def name(self, name: str):


Schemata = StrEnum("Schemata", [k for k in model.schemata.keys()])
Things = StrEnum("Things", [k for k, s in model.schemata.items() if s.is_a("Thing")])
Intervals = StrEnum(
"Intervals", [k for k, s in model.schemata.items() if s.is_a("Interval")]
)
Properties = StrEnum("Properties", {p.name for p in model.properties})
PropertyTypes = StrEnum("PropertyTypes", {p.type for p in model.properties})
PropertyTypesMap = Enum("PropertyTypesMap", {p.name: p.type for p in model.properties})
Expand All @@ -67,7 +70,7 @@ def name(self, name: str):
"endswith",
],
)
Frequencies = StrEnum("Frequencies", [f for f in DataCoverage.FREQUENCIES])
Frequencies = StrEnum("Frequencies", DataCoverage.FREQUENCIES)
Aggregations = StrEnum("Aggregations", ("min", "max", "sum", "avg", "count"))
Fields = StrEnum("Fields", ["id", "dataset", "schema", "year"])

Expand Down
90 changes: 2 additions & 88 deletions ftmq/io.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import contextlib
import logging
import sys
from collections.abc import Iterable
from typing import Any, Literal

import orjson
from anystore.io import smart_open, smart_stream
from banal import is_listish
from fsspec import open
from nomenklatura.entity import CE, CompositeEntity
from nomenklatura.util import PathLike

Expand All @@ -20,89 +17,6 @@
DEFAULT_MODE = "rb"


def _get_sysio(mode: str | None = DEFAULT_MODE) -> Literal[sys.stdin, sys.stdout]:
if mode.startswith("r"):
return sys.stdin
return sys.stdout


class SmartHandler:
def __init__(
self,
uri: Any,
*args,
**kwargs,
) -> None:
if not uri:
raise ValueError("Missing uri")
self.uri = str(uri)
self.args = args
kwargs["mode"] = kwargs.get("mode", DEFAULT_MODE)
self.sys_io = _get_sysio(kwargs["mode"])
if kwargs["mode"].endswith("b"):
self.sys_io = self.sys_io.buffer
self.kwargs = kwargs
self.is_buffer = self.uri == "-"
self.handler = None

def open(self):
if self.is_buffer:
self.handler = self.sys_io
else:
handler = open(self.uri, *self.args, **self.kwargs)
self.handler = handler.open()
return self.handler

def close(self):
if not self.is_buffer:
self.handler.close()

def __enter__(self):
return self.open()

def __exit__(self, *args, **kwargs) -> None:
self.close()


@contextlib.contextmanager
def smart_open(
uri: Any,
mode: str | None = None,
*args,
**kwargs,
):
if mode is not None:
kwargs["mode"] = mode
else:
kwargs["mode"] = kwargs.get("mode", DEFAULT_MODE)
handler = SmartHandler(uri, *args, **kwargs)
try:
yield handler.open()
finally:
handler.close()


def _smart_stream(uri, *args, **kwargs) -> Any:
with smart_open(uri, *args, **kwargs) as fh:
while line := fh.readline():
yield line


def smart_read(uri, *args, **kwargs) -> Any:
stream = kwargs.pop("stream", False)
if stream:
return _smart_stream(uri, *args, **kwargs)

with smart_open(uri, *args, **kwargs) as fh:
return fh.read()


def smart_write(uri, content: bytes | str, *args, **kwargs) -> Any:
kwargs["mode"] = kwargs.get("mode", "wb")
with smart_open(uri, *args, **kwargs) as fh:
fh.write(content)


def smart_get_store(uri: PathLike, **kwargs) -> Store | None:
try:
return get_store(uri, **kwargs)
Expand All @@ -128,7 +42,7 @@ def smart_read_proxies(
yield from view.entities(query)
return

lines = smart_read(uri, stream=True)
lines = smart_stream(uri)
lines = (orjson.loads(line) for line in lines)
if serialize or query:
q = query or Query()
Expand Down
Loading

0 comments on commit 26fd016

Please sign in to comment.