Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: publish and dev versions #359

Merged
merged 6 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ jobs:
Pipeline:
if: github.ref == 'refs/heads/master'

runs-on: ubuntu-22.04
container: quintoandar/python-3-7-java
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ VERSION := $(shell grep __version__ setup.py | head -1 | cut -d \" -f2 | cut -d
.PHONY: environment
## create virtual environment for butterfree
environment:
@pyenv install -s 3.7.13
@pyenv virtualenv 3.7.13 butterfree
@pyenv install -s 3.9.19
@pyenv virtualenv 3.9.19 butterfree
@pyenv local butterfree
@PYTHONPATH=. python -m pip install --upgrade pip

Expand Down
2 changes: 1 addition & 1 deletion butterfree/_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from butterfree._cli import migrate

app = typer.Typer()
app = typer.Typer(no_args_is_help=True)
app.add_typer(migrate.app, name="migrate")

if __name__ == "__main__":
Expand Down
6 changes: 4 additions & 2 deletions butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from butterfree.migrations.database_migration import ALLOWED_DATABASE
from butterfree.pipelines import FeatureSetPipeline

app = typer.Typer(help="Apply the automatic migrations in a database.")
app = typer.Typer(
help="Apply the automatic migrations in a database.", no_args_is_help=True
)

logger = __logger("migrate", True)

Expand Down Expand Up @@ -89,7 +91,7 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
instances.add(value)

logger.info("Creating instances...")
return set(value() for value in instances)
return set(value() for value in instances) # type: ignore


PATH = typer.Argument(
Expand Down
1 change: 1 addition & 0 deletions butterfree/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds connection clients."""

from butterfree.clients.abstract_client import AbstractClient
from butterfree.clients.cassandra_client import CassandraClient
from butterfree.clients.spark_client import SparkClient
Expand Down
5 changes: 3 additions & 2 deletions butterfree/clients/abstract_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Abstract class for database clients."""

from abc import ABC, abstractmethod
from typing import Any
from typing import Any, Optional


class AbstractClient(ABC):
Expand All @@ -25,7 +26,7 @@ def sql(self, query: str) -> Any:
pass

@abstractmethod
def get_schema(self, table: str, database: str = None) -> Any:
def get_schema(self, table: str, database: Optional[str] = None) -> Any:
"""Returns desired table schema.

Attributes:
Expand Down
5 changes: 4 additions & 1 deletion butterfree/clients/cassandra_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""CassandraClient entity."""

from ssl import CERT_REQUIRED, PROTOCOL_TLSv1
from typing import Dict, List, Optional

Expand Down Expand Up @@ -102,7 +103,9 @@ def sql(self, query: str) -> ResponseFuture:
"""
return self.conn.execute(query)

def get_schema(self, table: str, database: str = None) -> List[Dict[str, str]]:
def get_schema(
self, table: str, database: Optional[str] = None
) -> List[Dict[str, str]]:
"""Returns desired table schema.

Attributes:
Expand Down
25 changes: 16 additions & 9 deletions butterfree/clients/spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def read(

return df_reader.format(format).load(path=path, **options) # type: ignore

def read_table(self, table: str, database: str = None) -> DataFrame:
def read_table(self, table: str, database: Optional[str] = None) -> DataFrame:
"""Use the SparkSession.read interface to read a metastore table.

Args:
Expand Down Expand Up @@ -179,9 +179,9 @@ def write_table(
database: Optional[str],
table_name: str,
path: str,
format_: str = None,
mode: str = None,
partition_by: List[str] = None,
format_: Optional[str] = None,
mode: Optional[str] = None,
partition_by: Optional[List[str]] = None,
**options: Any,
) -> None:
"""Receive a spark DataFrame and write it as a table in metastore.
Expand Down Expand Up @@ -231,7 +231,10 @@ def create_temporary_view(dataframe: DataFrame, name: str) -> Any:
return dataframe.writeStream.format("memory").queryName(name).start()

def add_table_partitions(
self, partitions: List[Dict[str, Any]], table: str, database: str = None
self,
partitions: List[Dict[str, Any]],
table: str,
database: Optional[str] = None,
) -> None:
"""Add partitions to an existing table.

Expand Down Expand Up @@ -259,9 +262,11 @@ def add_table_partitions(
key_values_expr = [
", ".join(
[
"{} = {}".format(k, v)
if not isinstance(v, str)
else "{} = '{}'".format(k, v)
(
"{} = {}".format(k, v)
if not isinstance(v, str)
else "{} = '{}'".format(k, v)
)
for k, v in partition.items()
]
)
Expand Down Expand Up @@ -314,7 +319,9 @@ def _convert_schema(self, schema: DataFrame) -> List[Dict[str, str]]:

return converted_schema

def get_schema(self, table: str, database: str = None) -> List[Dict[str, str]]:
def get_schema(
self, table: str, database: Optional[str] = None
) -> List[Dict[str, str]]:
"""Returns desired table schema.

Attributes:
Expand Down
25 changes: 13 additions & 12 deletions butterfree/configs/db/cassandra_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds configurations to read and write with Spark to Cassandra DB."""

from typing import Any, Dict, List, Optional

from butterfree.configs import environment
Expand Down Expand Up @@ -32,18 +33,18 @@ class CassandraConfig(AbstractWriteConfig):

def __init__(
self,
username: str = None,
password: str = None,
host: str = None,
keyspace: str = None,
mode: str = None,
format_: str = None,
stream_processing_time: str = None,
stream_output_mode: str = None,
stream_checkpoint_path: str = None,
read_consistency_level: str = None,
write_consistency_level: str = None,
local_dc: str = None,
username: Optional[str] = None,
password: Optional[str] = None,
host: Optional[str] = None,
keyspace: Optional[str] = None,
mode: Optional[str] = None,
format_: Optional[str] = None,
stream_processing_time: Optional[str] = None,
stream_output_mode: Optional[str] = None,
stream_checkpoint_path: Optional[str] = None,
read_consistency_level: Optional[str] = None,
write_consistency_level: Optional[str] = None,
local_dc: Optional[str] = None,
):
self.username = username
self.password = password
Expand Down
17 changes: 9 additions & 8 deletions butterfree/configs/db/kafka_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds configurations to read and write with Spark to Kafka."""

from typing import Any, Dict, List, Optional

from butterfree.configs import environment
Expand All @@ -25,13 +26,13 @@ class KafkaConfig(AbstractWriteConfig):

def __init__(
self,
kafka_topic: str = None,
kafka_connection_string: str = None,
mode: str = None,
format_: str = None,
stream_processing_time: str = None,
stream_output_mode: str = None,
stream_checkpoint_path: str = None,
kafka_topic: Optional[str] = None,
kafka_connection_string: Optional[str] = None,
mode: Optional[str] = None,
format_: Optional[str] = None,
stream_processing_time: Optional[str] = None,
stream_output_mode: Optional[str] = None,
stream_checkpoint_path: Optional[str] = None,
):
self.kafka_topic = kafka_topic
self.kafka_connection_string = kafka_connection_string
Expand Down Expand Up @@ -147,4 +148,4 @@ def translate(self, schema: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
Kafka schema.

"""
pass
return [{}]
8 changes: 4 additions & 4 deletions butterfree/configs/db/metastore_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class MetastoreConfig(AbstractWriteConfig):

def __init__(
self,
path: str = None,
mode: str = None,
format_: str = None,
file_system: str = None,
path: Optional[str] = None,
mode: Optional[str] = None,
format_: Optional[str] = None,
file_system: Optional[str] = None,
):
self.path = path
self.mode = mode
Expand Down
5 changes: 4 additions & 1 deletion butterfree/configs/environment.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds functions for managing the running environment."""

import os
from typing import Optional

Expand Down Expand Up @@ -34,7 +35,9 @@ def __init__(self, variable_name: str):
)


def get_variable(variable_name: str, default_value: str = None) -> Optional[str]:
def get_variable(
variable_name: str, default_value: Optional[str] = None
) -> Optional[str]:
"""Gets an environment variable.

The variable comes from it's explicitly declared value in the running
Expand Down
1 change: 1 addition & 0 deletions butterfree/constants/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds constant attributes that are common for Butterfree."""

from butterfree.constants.data_type import DataType

__all__ = ["DataType"]
1 change: 1 addition & 0 deletions butterfree/constants/migrations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Migrations' Constants."""

from butterfree.constants import columns

PARTITION_BY = [
Expand Down
1 change: 1 addition & 0 deletions butterfree/dataframe_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Dataframe optimization components regarding Butterfree."""

from butterfree.dataframe_service.incremental_strategy import IncrementalStrategy
from butterfree.dataframe_service.partitioning import extract_partition_values
from butterfree.dataframe_service.repartition import repartition_df, repartition_sort_df
Expand Down
17 changes: 13 additions & 4 deletions butterfree/dataframe_service/incremental_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

from typing import Optional

from pyspark.sql import DataFrame


Expand All @@ -18,7 +20,7 @@ class IncrementalStrategy:
filter can properly work with the defined upper and lower bounds.
"""

def __init__(self, column: str = None):
def __init__(self, column: Optional[str] = None):
self.column = column

def from_milliseconds(self, column_name: str) -> IncrementalStrategy:
Expand All @@ -32,7 +34,9 @@ def from_milliseconds(self, column_name: str) -> IncrementalStrategy:
"""
return IncrementalStrategy(column=f"from_unixtime({column_name}/ 1000.0)")

def from_string(self, column_name: str, mask: str = None) -> IncrementalStrategy:
def from_string(
self, column_name: str, mask: Optional[str] = None
) -> IncrementalStrategy:
"""Create a column expression from ts column defined as a simple string.

Args:
Expand Down Expand Up @@ -66,7 +70,9 @@ def from_year_month_day_partitions(
f"'-', string({day_column}))"
)

def get_expression(self, start_date: str = None, end_date: str = None) -> str:
def get_expression(
self, start_date: Optional[str] = None, end_date: Optional[str] = None
) -> str:
"""Get the incremental filter expression using the defined dates.

Both arguments can be set to defined a specific date interval, but it's
Expand Down Expand Up @@ -95,7 +101,10 @@ def get_expression(self, start_date: str = None, end_date: str = None) -> str:
return f"date({self.column}) <= date('{end_date}')"

def filter_with_incremental_strategy(
self, dataframe: DataFrame, start_date: str = None, end_date: str = None
self,
dataframe: DataFrame,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> DataFrame:
"""Filters the dataframe according to the date boundaries.

Expand Down
13 changes: 7 additions & 6 deletions butterfree/dataframe_service/repartition.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module where there are repartition methods."""
from typing import List

from typing import List, Optional

from pyspark.sql.dataframe import DataFrame

Expand All @@ -10,7 +11,7 @@


def _num_partitions_definition(
num_processors: int = None, num_partitions: int = None
num_processors: Optional[int] = None, num_partitions: Optional[int] = None
) -> int:
num_partitions = (
num_processors * PARTITION_PROCESSOR_RATIO
Expand All @@ -24,8 +25,8 @@ def _num_partitions_definition(
def repartition_df(
dataframe: DataFrame,
partition_by: List[str],
num_partitions: int = None,
num_processors: int = None,
num_partitions: Optional[int] = None,
num_processors: Optional[int] = None,
) -> DataFrame:
"""Partition the DataFrame.

Expand All @@ -47,8 +48,8 @@ def repartition_sort_df(
dataframe: DataFrame,
partition_by: List[str],
order_by: List[str],
num_processors: int = None,
num_partitions: int = None,
num_processors: Optional[int] = None,
num_partitions: Optional[int] = None,
) -> DataFrame:
"""Partition and Sort the DataFrame.

Expand Down
1 change: 1 addition & 0 deletions butterfree/extract/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""The Source Component of a Feature Set."""

from butterfree.extract.source import Source

__all__ = ["Source"]
1 change: 1 addition & 0 deletions butterfree/extract/pre_processing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Pre Processing Components regarding Readers."""

from butterfree.extract.pre_processing.explode_json_column_transform import (
explode_json_column,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Explode json column for dataframes."""

from pyspark.sql.dataframe import DataFrame, StructType
from pyspark.sql.functions import from_json, get_json_object

Expand Down
1 change: 1 addition & 0 deletions butterfree/extract/pre_processing/filter_transform.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module where filter DataFrames coming from readers."""

from pyspark.sql.dataframe import DataFrame


Expand Down
Loading
Loading