-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathtest_custom_provider.py
39 lines (28 loc) · 1.04 KB
/
test_custom_provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import os
from datetime import datetime
from feast import FeatureStore
from basic_feature_repo.repo import driver, driver_hourly_stats_view
def test_end_to_end():
fs = FeatureStore("basic_feature_repo/")
# apply repository
fs.apply([driver, driver_hourly_stats_view])
# load data into online store
fs.materialize_incremental(end_date=datetime.now())
# Read features from online store
feature_vector = fs.get_online_features(
features=["driver_hourly_stats:conv_rate"], entity_rows=[{"driver_id": 1001}]
).to_dict()
conv_rate = feature_vector["conv_rate"][0]
assert conv_rate > 0
# tear down feature store
fs.teardown()
def test_cli():
os.system(
"PYTHONPATH=$PYTHONPATH:/$(pwd) feast -c basic_feature_repo apply > output"
)
with open("output", "r") as f:
output = f.read()
if "Launching custom streaming jobs is pretty easy" not in output:
raise Exception(
'Failed to successfully use provider from CLI. See "output" for more details.'
)