Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Make the API for local and distributed arrays match #101

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bolt/factory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from bolt.local.construct import ConstructLocal
from bolt.spark.construct import ConstructSpark
from bolt.localspark.construct import ConstructLocalSpark

constructors = [
('local', ConstructLocal),
('spark', ConstructSpark)
('spark', ConstructSpark),
{'localspark', ConstructLocalSpark}
]

def wrapped(f):
Expand Down
111 changes: 111 additions & 0 deletions bolt/localspark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from collections import defaultdict
class LocalRDD(object):
"""
A simple, inefficient, approximation for a locally stored RDD object (a lazy list)
"""
def __init__(self, kv_list, fcn_to_apply = None):
"""
Parameters
----------
kv_list : list
the list of key value pairs
fcn_to_apply : function
the function that should be applied, ndarray -> ndarray
"""
self.kv_list = kv_list
self.lazy_fcn_list = [] if fcn_to_apply is None else fcn_to_apply
self.cached_list = None

def map(self, c_func):
return LocalRDD(self.kv_list, fcn_to_apply = self.lazy_fcn_list + [c_func])

def mapValues(self, c_func):
return self.map(lambda (a,b): (a,c_func(b)))

def flatMap(self, c_func):
return self.map(c_func).map('flatten')

def mapPartitions(self, c_func):
"""
Make artificial partitions and executes the function on each one
"""
return self.map(lambda (k,v): c_func([(k,v)]))

def partitionBy(self, numPartitions, partitionFunc):
"""
The current operations need to be executed before they can be handed off to the partition function
"""
return LocalPartitionedRDD(self.collect(), partitionFunc)

def values(self):
return self.map(lambda (_, v): v)

def first(self):
return self.collect()[0]

def collect(self):
if self.cached_list is None:
self.cached_list = LocalRDD.expand(self)
return self.cached_list

@property
def context(self):
# making a new one is easier
return LocalSparkContext()

@staticmethod
def expand(curRDD):
last_list = curRDD.kv_list
for c_func in curRDD.lazy_fcn_list:
if c_func == 'flatten':
out_list = []
for i in last_list:
out_list += i
last_list = out_list
else:
last_list = map(c_func, last_list)
return last_list

class LocalPartitionedRDD(object):

def __init__(self, kv_list, partitionFunc, part_rdd = None):
"""
Creates a partitioned RDD which supports mapPartitions and values operations

Parameters
----------
kv_list : list[(k,v)]
the list of key values
partitionFunc : function
apply to the keys to put them in distinct partitions
part_rdd : dict
to supply the already partitioned dataset with keys as the partition ids
and values as the partition contents
"""
if part_rdd is None:
self.part_rdd = defaultdict(list)
for (k,v) in kv_list:
self.part_rdd[partitionFunc(k)]+=[(k,v)]
else:
self.part_rdd = part_rdd
self.partitionFunc = partitionFunc
self.kv_list = kv_list

def mapPartitions(self, c_func):
new_part_values = {}
new_kv_list = []
for partName, partValues in self.part_rdd.iteritems():
new_values = c_func(partValues)
new_part_values[partName] = new_values
new_kv_list += new_values
return LocalPartitionedRDD(new_kv_list, self.partitionFunc, part_rdd = new_part_values)

def values(self):
return LocalRDD(self.kv_list, [lambda (_, v): v])

class LocalSparkContext(object):
def __init__(self):
pass
def parallelize(self, in_list, npartitions = 0):
return LocalRDD(in_list)

46 changes: 46 additions & 0 deletions bolt/localspark/construct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from bolt.construct import ConstructBase
from bolt.spark.construct import ConstructSpark as cs
import numpy as np
from bolt.localspark import LocalSparkContext


class ConstructLocalSpark(ConstructBase):

@staticmethod
def array(a, context=None, axis=(0,), dtype=None, npartitions=None):
return cs.array(a, context = LocalSparkContext(), axis = axis,
dtype=dtype, npartitions=npartitions)

@staticmethod
def ones(shape, context=None, axis=(0,), dtype=np.float64, npartitions=None):
return cs.ones(a, context = LocalSparkContext(), axis = axis,
dtype=dtype, npartitions=npartitions)

@staticmethod
def zeros(shape, context=None, axis=(0,), dtype=np.float64, npartitions=None):
return cs.zeros(a, context = LocalSparkContext(), axis = axis,
dtype=dtype, npartitions=npartitions)

@staticmethod
def concatenate(arrays, axis=0):
return cs.concatenate(array, axis = axis)

@staticmethod
def _argcheck(*args, **kwargs):
"""
Check that arguments are consistent with localspark array construction.

Condition is
(1) keyword arg 'context' is the string 'fake'
"""

return kwargs.get('context', '').find('fake')>=0

@staticmethod
def _format_axes(axes, shape):
return cs._format_axes(axes, shape)

@staticmethod
def _wrap(func, shape, context=None, axis=(0,), dtype=None, npartitions=None):
return cs._wrap(func, shape = shape, context = LocalSparkContext(), axis = axis,
dtype = dtype, npartitions = npartitions)