Skip to content

Commit

Permalink
Merge pull request #42 from lensacom/dev
Browse files Browse the repository at this point in the history
Fixed dict vectorizer heck_rdd; DictRDD transform dtype issue
  • Loading branch information
kszucs committed Jun 24, 2015
2 parents 9274ba1 + 3d519bd commit 6ca0df3
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 41 deletions.
6 changes: 5 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Sparkit-learn
=============

|Build Status| |PyPi|
|Build Status| |PyPi| |Gitter|

**PySpark + Scikit-learn = Sparkit-learn**

Expand Down Expand Up @@ -448,3 +448,7 @@ Special thanks
:target: https://travis-ci.org/lensacom/sparkit-learn
.. |PyPi| image:: https://img.shields.io/pypi/v/sparkit-learn.svg
:target: https://pypi.python.org/pypi/sparkit-learn
.. |Gitter| image:: https://badges.gitter.im/Join%20Chat.svg
:alt: Join the chat at https://gitter.im/lensacom/sparkit-learn
:target: https://gitter.im/lensacom/sparkit-learn?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge

8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import sys

from setuptools import setup
from setuptools import find_packages
from setuptools import find_packages, setup


def is_numpy_installed():
Expand All @@ -17,7 +16,7 @@ def is_numpy_installed():
def setup_package():
metadata = dict(
name='sparkit-learn',
version="0.2.4",
version='0.2.5',
description='Scikit-learn on PySpark',
author='Krisztian Szucs, Andras Fulop',
author_email='[email protected], [email protected]',
Expand All @@ -33,7 +32,8 @@ def setup_package():
if is_numpy_installed() is False:
raise ImportError("Numerical Python (NumPy) is not installed.\n"
"sparkit-learn requires NumPy.\n"
"Installation instructions are available on scikit-learn website: "
"Installation instructions are available on "
"scikit-learn website: "
"http://scikit-learn.org/stable/install.html\n")

setup(**metadata)
Expand Down
10 changes: 3 additions & 7 deletions splearn/feature_extraction/dict_vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import numpy as np
import scipy.sparse as sp
from sklearn.feature_extraction import DictVectorizer
from pyspark import AccumulatorParam
from sklearn.externals import six
from sklearn.feature_extraction import DictVectorizer

from ..base import SparkBroadcasterMixin
from ..rdd import DictRDD
from pyspark import AccumulatorParam
from ..utils.validation import check_rdd
from ..base import SparkBroadcasterMixin


class SparkDictVectorizer(DictVectorizer, SparkBroadcasterMixin):
Expand Down Expand Up @@ -87,7 +87,6 @@ def fit(self, Z):
self
"""
X = Z[:, 'X'] if isinstance(Z, DictRDD) else Z
check_rdd(X, (np.ndarray,))

"""Create vocabulary
"""
Expand Down Expand Up @@ -142,9 +141,6 @@ def transform(self, Z):
Z : transformed, containing {array, sparse matrix}
Feature vectors; always 2-d.
"""
X = Z[:, 'X'] if isinstance(Z, DictRDD) else Z
check_rdd(X, (np.ndarray, sp.spmatrix))

mapper = self.broadcast(super(SparkDictVectorizer, self).transform,
Z.context)
dtype = sp.spmatrix if self.sparse else np.ndarray
Expand Down
2 changes: 1 addition & 1 deletion splearn/feature_extraction/tests/test_dict_vectorizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import scipy.sparse as sp
import numpy as np
import scipy.sparse as sp
from sklearn.feature_extraction import DictVectorizer
from splearn.feature_extraction import SparkDictVectorizer
from splearn.rdd import ArrayRDD
Expand Down
5 changes: 3 additions & 2 deletions splearn/feature_extraction/tests/test_text.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import scipy.sparse as sp
import numpy as np
import scipy.sparse as sp
from sklearn.feature_extraction.text import (CountVectorizer,
HashingVectorizer,
TfidfTransformer)
from splearn.feature_extraction.text import (SparkCountVectorizer,
SparkHashingVectorizer,
SparkTfidfTransformer)
from splearn.utils.testing import (SplearnTestCase, assert_array_almost_equal,
assert_array_equal, assert_equal, assert_true)
assert_array_equal, assert_equal,
assert_true)
from splearn.utils.validation import check_rdd_dtype


Expand Down
2 changes: 1 addition & 1 deletion splearn/feature_selection/tests/test_variance_threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,6 @@ def test_same_transform_with_treshold(self):
result_dist.toarray())

result_dist = dist.fit_transform(Z_rdd)[:, 'X']
assert_true(check_rdd_dtype(result_dist, (sp.spmatrix,)))
assert_true(check_rdd_dtype(result_dist, (sp.spmatrix,)))
assert_array_almost_equal(result_local.toarray(),
result_dist.toarray())
2 changes: 1 addition & 1 deletion splearn/feature_selection/variance_threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from sklearn.utils.sparsefuncs import mean_variance_axis

from ..rdd import DictRDD
from .base import SparkSelectorMixin
from ..utils.validation import check_rdd
from .base import SparkSelectorMixin


class SparkVarianceThreshold(VarianceThreshold, SparkSelectorMixin):
Expand Down
3 changes: 1 addition & 2 deletions splearn/linear_model/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# encoding: utf-8

import scipy.sparse as sp
import numpy as np

import scipy.sparse as sp
from sklearn.base import copy
from sklearn.linear_model.base import LinearRegression

Expand Down
2 changes: 1 addition & 1 deletion splearn/linear_model/logistic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import scipy.sparse as sp
from sklearn.linear_model import LogisticRegression

from .base import SparkLinearModelMixin
from ..utils.validation import check_rdd
from .base import SparkLinearModelMixin


class SparkLogisticRegression(LogisticRegression, SparkLinearModelMixin):
Expand Down
2 changes: 1 addition & 1 deletion splearn/linear_model/stochastic_gradient.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import scipy.sparse as sp
from sklearn.linear_model import SGDClassifier

from .base import SparkLinearModelMixin
from ..utils.validation import check_rdd
from .base import SparkLinearModelMixin


class SparkSGDClassifier(SGDClassifier, SparkLinearModelMixin):
Expand Down
3 changes: 2 additions & 1 deletion splearn/linear_model/tests/test_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import numpy as np
from sklearn.linear_model import LinearRegression
from splearn.linear_model import SparkLinearRegression
from splearn.utils.testing import SplearnTestCase, assert_array_almost_equal, assert_true
from splearn.utils.testing import (SplearnTestCase, assert_array_almost_equal,
assert_true)
from splearn.utils.validation import check_rdd_dtype


Expand Down
8 changes: 2 additions & 6 deletions splearn/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ def zipper(a_b):
raise ValueError("Columns and dtype lengths must be equal!")

self.columns = tuple(columns)
super(DictRDD, self).__init__(rdd, bsize, dtype, noblock)
super(DictRDD, self).__init__(rdd, bsize, tuple(dtype), noblock)

def _block(self, rdd, bsize, dtype):
"""Execute the blocking process on the given rdd.
Expand Down Expand Up @@ -739,18 +739,14 @@ def transform(self, fn, column=None, dtype=None):
if column is None:
indices = list(range(len(self.columns)))
else:
# TODO: find a better way
#if not hasattr(column, '__iter__'):
if not type(column) in (list, tuple):
column = [column]
indices = [self.columns.index(c) for c in column]

if dtype is not None:
# TODO: find a better way!
#if not hasattr(dtype, '__iter__'):
if not type(dtype) in (list, tuple):
dtype = [dtype]
dtypes = [dtype[i] if i in indices else t
dtypes = [dtype[indices.index(i)] if i in indices else t
for i, t in enumerate(self.dtype)]

def mapper(values):
Expand Down
1 change: 1 addition & 0 deletions splearn/svm/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import scipy.sparse as sp
from sklearn.svm import LinearSVC
from splearn.linear_model.base import SparkLinearModelMixin

from ..utils.validation import check_rdd


Expand Down
5 changes: 3 additions & 2 deletions splearn/tests/test_naive_bayes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import numpy as np
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from splearn.naive_bayes import SparkGaussianNB, SparkMultinomialNB
from splearn.utils.testing import SplearnTestCase, assert_array_almost_equal, assert_true
from splearn.utils.testing import (SplearnTestCase, assert_array_almost_equal,
assert_true)
from splearn.utils.validation import check_rdd_dtype


Expand Down Expand Up @@ -32,5 +33,5 @@ def test_same_prediction(self):
y_local = local.fit(X, y).predict(X)
y_dist = dist.fit(Z, classes=np.unique(y)).predict(Z[:, 'X'])

assert_true(check_rdd_dtype(y_dist, (np.ndarray,)))
assert_true(check_rdd_dtype(y_dist, (np.ndarray,)))
assert_array_almost_equal(y_local, y_dist.toarray())
84 changes: 77 additions & 7 deletions splearn/tests/test_rdd.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import numpy as np
import scipy.sparse as sp
from pyspark import RDD
from splearn.rdd import ArrayRDD, SparseRDD, BlockRDD, DictRDD, block
from splearn.rdd import ArrayRDD, BlockRDD, DictRDD, SparseRDD, block
from splearn.utils.testing import (SplearnTestCase, assert_almost_equal,
assert_array_almost_equal,
assert_array_equal, assert_equal,
assert_is_instance,
assert_multiple_tuples_equal, assert_raises,
assert_true, assert_tuple_equal)
from splearn.utils.validation import check_rdd_dtype


class TestBlocking(SplearnTestCase):
Expand Down Expand Up @@ -633,6 +634,24 @@ def test_initialization(self):
assert_is_instance(DictRDD(rdd, bsize=None), DictRDD)
assert_is_instance(DictRDD(rdd), BlockRDD)

def test_creation_from_zipped_rdd(self):
x = np.arange(80).reshape((40, 2))
y = range(40)
x_rdd = self.sc.parallelize(x, 4)
y_rdd = self.sc.parallelize(y, 4)
zipped_rdd = x_rdd.zip(y_rdd)

expected = (np.arange(20).reshape(10, 2), tuple(range(10)))

rdd = DictRDD(zipped_rdd)
assert_tuple_equal(rdd.first(), expected)
rdd = DictRDD(zipped_rdd, columns=('x', 'y'))
assert_tuple_equal(rdd.first(), expected)
rdd = DictRDD(zipped_rdd, dtype=(np.ndarray, list))
first = rdd.first()
assert_tuple_equal(first, expected)
assert_is_instance(first[1], list)

def test_creation_from_rdds(self):
x = np.arange(80).reshape((40, 2))
y = np.arange(40)
Expand Down Expand Up @@ -676,6 +695,28 @@ def test_creation_from_blocked_rdds(self):
assert_tuple_equal(first, expected)
assert_is_instance(first[2], list)

def test_auto_dtype(self):
x = np.arange(80).reshape((40, 2))
y = tuple(range(40))
z = list(range(40))
x_rdd = self.sc.parallelize(x, 4)
y_rdd = self.sc.parallelize(y, 4)
z_rdd = self.sc.parallelize(z, 4)

expected = (np.arange(20).reshape(10, 2), tuple(range(10)),
list(range(10)))

rdd = DictRDD([x_rdd, y_rdd, z_rdd])
assert_tuple_equal(rdd.first(), expected)
assert_equal(rdd.dtype, (np.ndarray, tuple, tuple))
assert_true(check_rdd_dtype(rdd, {0: np.ndarray, 1: tuple, 2: tuple}))

rdd = DictRDD([x_rdd, y_rdd, z_rdd], columns=('x', 'y', 'z'))
assert_tuple_equal(rdd.first(), expected)
assert_equal(rdd.dtype, (np.ndarray, tuple, tuple))
assert_true(check_rdd_dtype(rdd, {'x': np.ndarray, 'y': tuple,
'z': tuple}))

def test_get_single_tuple(self):
x, y = np.arange(80).reshape((40, 2)), np.arange(40)
x_rdd = self.sc.parallelize(x, 2)
Expand Down Expand Up @@ -772,16 +813,16 @@ def test_transform(self):
X = DictRDD(rdd1.zip(rdd2), bsize=5)

X1 = [(x[0], x[1] ** 2) for x in X.collect()]
X2 = X.transform(lambda a, b: (a, b ** 2)).collect()
assert_multiple_tuples_equal(X1, X2)
X2 = X.transform(lambda a, b: (a, b ** 2))
assert_multiple_tuples_equal(X1, X2.collect())

X1 = [(x[0], x[1] ** 2) for x in X.collect()]
X2 = X.transform(lambda x: x ** 2, column=1).collect()
assert_multiple_tuples_equal(X1, X2)
X2 = X.transform(lambda x: x ** 2, column=1)
assert_multiple_tuples_equal(X1, X2.collect())

X1 = [(x[0] ** 2, x[1]) for x in X.collect()]
X2 = X.transform(lambda x: x ** 2, column=0).collect()
assert_multiple_tuples_equal(X1, X2)
X2 = X.transform(lambda x: x ** 2, column=0)
assert_multiple_tuples_equal(X1, X2.collect())

X1 = [(x[0] ** 2, x[1] ** 0.5) for x in X.collect()]
X2 = X.transform(lambda a, b: (a ** 2, b ** 0.5), column=[0, 1])
Expand All @@ -790,3 +831,32 @@ def test_transform(self):
X1 = [(x[0] ** 2, x[1] ** 0.5) for x in X.collect()]
X2 = X.transform(lambda b, a: (b ** 0.5, a ** 2), column=[1, 0])
assert_multiple_tuples_equal(X1, X2.collect())

def test_transform_with_dtype(self):
data1 = np.arange(400).reshape((100, 4))
data2 = np.arange(200).reshape((100, 2))
rdd1 = self.sc.parallelize(data1, 4)
rdd2 = self.sc.parallelize(data2, 4)

X = DictRDD(rdd1.zip(rdd2), bsize=5)

X2 = X.transform(lambda x: x ** 2, column=0)
assert_equal(X2.dtype, (np.ndarray, np.ndarray))

X2 = X.transform(lambda x: tuple((x ** 2).tolist()), column=0,
dtype=tuple)
assert_equal(X2.dtype, (tuple, np.ndarray))
assert_true(check_rdd_dtype(X2, {0: tuple, 1: np.ndarray}))

X2 = X.transform(lambda x: x ** 2, column=1, dtype=list)
assert_equal(X2.dtype, (np.ndarray, list))
assert_true(check_rdd_dtype(X2, {0: np.ndarray, 1: list}))

X2 = X.transform(lambda a, b: (a ** 2, (b ** 0.5).tolist()),
column=[0, 1], dtype=(np.ndarray, list))
assert_true(check_rdd_dtype(X2, {0: np.ndarray, 1: list}))

X2 = X.transform(lambda b, a: ((b ** 0.5).tolist(), a ** 2),
column=[1, 0], dtype=(list, np.ndarray))
assert_equal(X2.dtype, (np.ndarray, list))
assert_true(check_rdd_dtype(X2, {0: np.ndarray, 1: list}))
6 changes: 3 additions & 3 deletions splearn/utils/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import numpy as np
import scipy.sparse as sp
from splearn.utils.validation import check_rdd_dtype, check_rdd
from sklearn.utils.testing import assert_false, assert_raises, assert_true
from splearn.rdd import ArrayRDD, DictRDD, SparseRDD
from sklearn.utils.testing import assert_false, assert_true, assert_raises
from splearn.utils.testing import SplearnTestCase
from splearn.utils.validation import check_rdd, check_rdd_dtype


class TestUtilities(SplearnTestCase):

Expand Down Expand Up @@ -53,4 +54,3 @@ def test_check_rdd(self):
assert_raises(TypeError, check_rdd, (dict_rdd, {'X': (array, spmat)}))
assert_raises(TypeError, check_rdd, (dict_rdd, (tuple,)))
assert_raises(TypeError, check_rdd, (np.arange(20), (array,)))

3 changes: 2 additions & 1 deletion splearn/utils/validation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from splearn.rdd import DictRDD, BlockRDD
from splearn.rdd import BlockRDD, DictRDD


def check_rdd_dtype(rdd, expected_dtype):
Expand Down Expand Up @@ -38,6 +38,7 @@ def check_rdd_dtype(rdd, expected_dtype):

return rdd.dtype in expected_dtype


def check_rdd(rdd, expected_dtype):
"""Wrapper function to check_rdd_dtype. Raises TypeError in case of dtype
mismatch.
Expand Down

0 comments on commit 6ca0df3

Please sign in to comment.