
pysparkling¶
Pysparkling provides a faster, more responsive way to develop programs for PySpark. It enables code intended for Spark applications to execute entirely in Python, without incurring the overhead of initializing and passing data through the JVM and Hadoop. The focus is on having a lightweight and fast implementation for small datasets at the expense of some data resilience features and some parallel processing features.
How does it work? To switch execution of a script from PySpark to pysparkling, have the code initialize a pysparkling Context instead of a SparkContext, and use the pysparkling Context to set up your RDDs. The beauty is you don’t have to change a single line of code after the Context initialization, because pysparkling’s API is (almost) exactly the same as PySpark’s. Since it’s so easy to switch between PySpark and pysparkling, you can choose the right tool for your use case.
When would I use it? Say you are writing a Spark application because you need robust computation on huge datasets, but you also want the same application to provide fast answers on a small dataset. You’re finding Spark is not responsive enough for your needs, but you don’t want to rewrite an entire separate application for the small-answers-fast problem. You’d rather reuse your Spark code but somehow get it to run fast. Pysparkling bypasses the stuff that causes Spark’s long startup times and less responsive feel.
Here are a few areas where pysparkling excels:
- Small to medium-scale exploratory data analysis
- Application prototyping
- Low-latency web deployments
- Unit tests
Features¶
- Supports URI schemes
s3://
,hdfs://
,gs://
,http://
andfile://
for Amazon S3, HDFS, Google Storage, web and local file access. Specify multiple files separated by comma. Resolves*
and?
wildcards. - Handles
.gz
,.zip
,.lzma
,.xz
,.bz2
,.tar
,.tar.gz
and.tar.bz2
compressed files. Supports reading of.7z
files. - Parallelization via
multiprocessing.Pool
,concurrent.futures.ThreadPoolExecutor
or any other Pool-like objects that have amap(func, iterable)
method. - Plain pysparkling does not have any dependencies (use
pip install pysparkling
). Some file access methods have optional dependencies:boto
for AWS S3,requests
for http,hdfs
for hdfs
Examples¶
Some demos are in the notebooks docs/demo.ipynb and docs/iris.ipynb .
Word Count
from pysparkling import Context
counts = (
Context()
.textFile('README.rst')
.map(lambda line: ''.join(ch if ch.isalnum() else ' ' for ch in line))
.flatMap(lambda line: line.split(' '))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
)
print(counts.collect())
which prints a long list of pairs of words and their counts.
Contents¶
Reading and Writing¶
This is a collection of best practices or templates for reading and writing various input and output formats.
Batch¶
Python List¶
The most direct input and output is from and to a Python list.
import pysparkling
sc = pysparkling.Context()
# reading
rdd = sc.parallelize(['hello', 'world'])
# back to Python list
print(rdd.collect())
# back to an iterator
rdd.toLocalIterator()
ND-JSON¶
Newline delimited JSON is a text file where every line is its own JSON string.
import json
import pysparkling
sc = pysparkling.Context()
# reading
rdd = (
sc
.textFile('input.json')
.map(json.loads)
)
# writing
(
rdd
.map(json.dumps)
.saveAsTextFile('output.json')
)
CSV¶
import csv
import io
import pysparkling
sc = pysparkling.Context()
# reading
rdd = (
sc
.textFile('input.csv')
.mapPartitions(csv.reader)
)
# writing
def csv_row(data):
s = io.StringIO()
csv.writer(s).writerow(data)
return s.getvalue()[:-1]
(
rdd
.map(csv_row)
.saveAsTextFile('output.csv')
)
TensorFlow Records¶
This example preprocesses example data into a TensorFlow Records file. The
second part is a cross check and prints the contents of the tfrecords
file.
import pysparkling
import tensorflow as tf
def to_tfrecord(self, xy):
X, y = xy
example = tf.train.Example(features=tf.train.Features(feature={
'X': tf.train.Feature(float_list=tf.train.FloatList(value=X)),
'y': tf.train.Feature(int64_list=tf.train.Int64List(value=y)),
}))
return example.SerializeToString()
# example
X = [1.2, 3.1, 8.7]
y = [2, 5]
# writing
sc = pysparkling.Context()
rdd = (
sc
.parallelize([(X, y)])
.map(to_tfrecord)
)
with tf.python_io.TFRecordWriter('out.tfrecords') as writer:
for example in rdd.toLocalIterator():
writer.write(example)
# debugging a tf records file
for serialized_example in tf.python_io.tf_record_iterator('out.tfrecords'):
example = tf.train.Example()
example.ParseFromString(serialized_example)
X = example.features.feature['X'].float_list.value
y = example.features.feature['y'].int64_list.value
print(X, y)
API¶
A usual pysparkling
session starts with either parallelizing a list
with Context.parallelize()
or by reading data from a file using
Context.textFile()
. These two methods return RDD
instances that
can then be processed.
RDD¶
-
class
pysparkling.
RDD
(partitions, ctx)[source]¶ RDD
In Spark’s original form, RDDs are Resilient, Distributed Datasets. This class reimplements the same interface with the goal of being fast on small data at the cost of being resilient and distributed.
Parameters: - partitions (list) – A list of instances of
Partition
. - ctx (Context) – An instance of the applicable
Context
.
-
compute
(split, task_context)[source]¶ interface to extend behavior for specific cases
Parameters: split (Partition) – a partition
-
aggregate
(zeroValue, seqOp, combOp)[source]¶ aggregate
[distributed]
Parameters: - zeroValue – The initial value to an aggregation, for example
0
or0.0
for aggregatingint
s andfloat
s, but any Python object is possible. Can beNone
. - seqOp – A reference to a function that combines the current state with a new value. In the first iteration, the current state is zeroValue.
- combOp – A reference to a function that combines outputs of seqOp. In the first iteration, the current state is zeroValue.
Returns: Output of
combOp
operations.Example:
>>> from pysparkling import Context >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) >>> Context().parallelize( ... [1, 2, 3, 4], 2 ... ).aggregate((0, 0), seqOp, combOp) (10, 4)
- zeroValue – The initial value to an aggregation, for example
-
aggregateByKey
(zeroValue, seqFunc, combFunc, numPartitions=None)[source]¶ aggregate by key
Parameters: - zeroValue – The initial value to an aggregation, for example
0
or0.0
for aggregatingint
s andfloat
s, but any Python object is possible. Can beNone
. - seqFunc – A reference to a function that combines the current state with a new value. In the first iteration, the current state is zeroValue.
- combFunc – A reference to a function that combines outputs of seqFunc. In the first iteration, the current state is zeroValue.
- numPartitions (int) – (optional) Not used.
Returns: An RDD with the output of
combOp
operations.Return type: Example:
>>> from pysparkling import Context >>> seqOp = (lambda x, y: x + y) >>> combOp = (lambda x, y: x + y) >>> r = Context().parallelize( ... [('a', 1), ('b', 2), ('a', 3), ('c', 4)] ... ).aggregateByKey(0, seqOp, combOp).collectAsMap() >>> (r['a'], r['b']) (4, 2)
- zeroValue – The initial value to an aggregation, for example
-
cache
()[source]¶ Once a partition is computed, cache the result.
Alias for
RDD.persist()
.Example:
>>> import pysparkling >>> >>> n_exec = 0 >>> >>> def _map(e): ... global n_exec ... n_exec += 1 ... return e*e >>> >>> sc = pysparkling.Context() >>> my_rdd = sc.parallelize([1, 2, 3, 4], 2).map(_map).cache() >>> >>> # no exec until here >>> n_exec 0 >>> # to get first element, compute the first partition >>> my_rdd.first() 1 >>> n_exec 2 >>> # now compute the rest >>> my_rdd.collect() [1, 4, 9, 16] >>> n_exec 4 >>> # now _map() was executed on all partitions and should >>> # not be executed again >>> my_rdd.collect() [1, 4, 9, 16] >>> n_exec 4
-
cartesian
(other)[source]¶ cartesian product of this RDD with
other
Parameters: other (RDD) – Another RDD. Return type: RDD Note
This is currently implemented as a local operation requiring all data to be pulled on one machine.
Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([1, 2]) >>> sorted(rdd.cartesian(rdd).collect()) [(1, 1), (1, 2), (2, 1), (2, 2)]
-
coalesce
(numPartitions, shuffle=False)[source]¶ coalesce
Parameters: - numPartitions (int) – Number of partitions in the resulting RDD.
- shuffle – (optional) Not used.
Return type: Note
This is currently implemented as a local operation requiring all data to be pulled on one machine.
Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3], 2).coalesce(1).getNumPartitions() 1
-
cogroup
(other, numPartitions=None)[source]¶ Groups keys from both RDDs together. Values are nested iterators.
Parameters: Return type: Example:
>>> from pysparkling import Context >>> c = Context() >>> a = c.parallelize([('house', 1), ('tree', 2)]) >>> b = c.parallelize([('house', 3)]) >>> >>> [(k, sorted(list([list(vv) for vv in v]))) ... for k, v in sorted(a.cogroup(b).collect()) ... ] [('house', [[1], [3]]), ('tree', [[], [2]])]
-
collect
()[source]¶ returns the entire dataset as a list
Return type: list Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3]).collect() [1, 2, 3]
-
collectAsMap
()[source]¶ returns a dictionary for a pair dataset
Return type: dict Example:
>>> from pysparkling import Context >>> d = Context().parallelize([('a', 1), ('b', 2)]).collectAsMap() >>> (d['a'], d['b']) (1, 2)
-
count
()[source]¶ number of entries in this dataset
Return type: int Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3], 2).count() 3
-
countByKey
()[source]¶ returns a
dict
containing the count for every keyReturn type: dict Example:
>>> from pysparkling import Context >>> Context().parallelize( ... [('a', 1), ('b', 2), ('b', 2)] ... ).countByKey()['b'] 2
-
countByValue
()[source]¶ returns a
dict
containing the count for every valueReturn type: dict Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 2, 4, 1]).countByValue()[2] 2
-
distinct
(numPartitions=None)[source]¶ returns only distinct elements
Parameters: numPartitions (int) – Number of partitions in the resulting RDD. Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 2, 4, 1]).distinct().count() 3
-
filter
(f)[source]¶ filter elements
Parameters: f – a function that decides whether to keep an element Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize( ... [1, 2, 2, 4, 1, 3, 5, 9], 3, ... ).filter(lambda x: x % 2 == 0).collect() [2, 2, 4]
-
first
()[source]¶ returns the first element in the dataset
Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 2, 4, 1, 3, 5, 9], 3).first() 1
Works also with empty partitions:
>>> from pysparkling import Context >>> Context().parallelize([1, 2], 20).first() 1
-
flatMap
(f, preservesPartitioning=True)[source]¶ map followed by flatten
Parameters: - f – The map function.
- preservesPartitioning – (optional) Preserve the partitioning of the original RDD. Default True.
Return type: Example:
>>> from pysparkling import Context >>> Context().parallelize(['hello', 'world']).flatMap( ... lambda x: [ord(ch) for ch in x] ... ).collect() [104, 101, 108, 108, 111, 119, 111, 114, 108, 100]
-
flatMapValues
(f)[source]¶ map operation on values in a (key, value) pair followed by a flatten
Parameters: f – The map function. Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize([(1, 'hi'), (2, 'world')]).flatMapValues( ... lambda x: [ord(ch) for ch in x] ... ).collect() [(1, 104), (1, 105), (2, 119), (2, 111), (2, 114), (2, 108), (2, 100)]
-
fold
(zeroValue, op)[source]¶ fold
Parameters: - zeroValue – The inital value, for example
0
or0.0
. - op – The reduce operation.
Returns: The folded (or aggregated) value.
Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 7, 2]) >>> my_rdd.fold(0, lambda a, b: a+b) 13
- zeroValue – The inital value, for example
-
foldByKey
(zeroValue, op)[source]¶ Fold (or aggregate) value by key.
Parameters: - zeroValue – The inital value, for example
0
or0.0
. - op – The reduce operation.
Return type: Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([('a', 4), ('b', 7), ('a', 2)]) >>> my_rdd.foldByKey(0, lambda a, b: a+b).collectAsMap()['a'] 6
- zeroValue – The inital value, for example
-
foreach
(f)[source]¶ applies
f
to every elementIt does not return a new RDD like
RDD.map()
.Parameters: f – Apply a function to every element. Return type: None Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([1, 2, 3]) >>> a = [] >>> my_rdd.foreach(lambda x: a.append(x)) >>> len(a) 3
-
foreachPartition
(f)[source]¶ applies
f
to every partitionIt does not return a new RDD like
RDD.mapPartitions()
.Parameters: f – Apply a function to every partition. Return type: None
-
fullOuterJoin
(other, numPartitions=None)[source]¶ returns the full outer join of two RDDs
The output contains all keys from both input RDDs, with missing keys replaced with None.
Parameters: Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> sc = Context() >>> rdd1 = sc.parallelize([('a', 0), ('b', 1)]) >>> rdd2 = sc.parallelize([('b', 2), ('c', 3)]) >>> sorted( ... rdd1.fullOuterJoin(rdd2).collect() ... ) [('a', (0, None)), ('b', (1, 2)), ('c', (None, 3))]
-
groupBy
(f, numPartitions=None)[source]¶ group by f
Parameters: - f – Function returning a key given an element of the dataset.
- numPartitions (int) – Number of partitions in the resulting RDD.
Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 7, 2]) >>> my_rdd.groupBy(lambda x: x % 2).mapValues(sorted).collect() [(0, [2, 4]), (1, [7])]
-
groupByKey
(numPartitions=None)[source]¶ group by key
Parameters: numPartitions (int) – Number of partitions in the resulting RDD. Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
-
histogram
(buckets)[source]¶ histogram
Parameters: buckets – A list of bucket boundaries or an int for the number of buckets. Returns: A tuple (bucket_boundaries, histogram_values) where bucket_boundaries is a list of length n+1 boundaries and histogram_values is a list of length n with the values of each bucket. Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([0, 4, 7, 4, 10]) >>> b, h = my_rdd.histogram(10) >>> h [1, 0, 0, 0, 2, 0, 0, 1, 0, 0, 1]
-
intersection
(other)[source]¶ intersection of this and other RDD
Parameters: other (RDD) – The other dataset to do the intersection with. Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> rdd1 = Context().parallelize([0, 4, 7, 4, 10]) >>> rdd2 = Context().parallelize([3, 4, 7, 4, 5]) >>> rdd1.intersection(rdd2).collect() [4, 7]
-
join
(other, numPartitions=None)[source]¶ join
Parameters: Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> rdd1 = Context().parallelize([(0, 1), (1, 1)]) >>> rdd2 = Context().parallelize([(2, 1), (1, 3)]) >>> rdd1.join(rdd2).collect() [(1, (1, 3))]
-
keyBy
(f)[source]¶ key by f
Parameters: f – Function that returns a key from a dataset element. Return type: RDD Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([0, 4, 7, 4, 10]) >>> rdd.keyBy(lambda x: x % 2).collect() [(0, 0), (0, 4), (1, 7), (0, 4), (0, 10)]
-
keys
()[source]¶ keys of a pair dataset
Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize([(0, 1), (1, 1)]).keys().collect() [0, 1]
-
leftOuterJoin
(other, numPartitions=None)[source]¶ left outer join
Parameters: Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> rdd1 = Context().parallelize([(0, 1), (1, 1)]) >>> rdd2 = Context().parallelize([(2, 1), (1, 3)]) >>> rdd1.leftOuterJoin(rdd2).collect() [(0, (1, None)), (1, (1, 3))]
-
lookup
(key)[source]¶ Return all the (key, value) pairs where the given key matches.
Parameters: key – The key to lookup. Return type: list Example:
>>> from pysparkling import Context >>> Context().parallelize([(0, 1), (1, 1), (1, 3)]).lookup(1) [1, 3]
-
map
(f)[source]¶ map
Parameters: f – map function for elements Return type: RDD Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3]).map(lambda x: x+1).collect() [2, 3, 4]
-
mapPartitions
(f, preservesPartitioning=False)[source]¶ map partitions
Parameters: f – map function for partitions Return type: RDD Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([1, 2, 3, 4], 2) >>> def f(iterator): ... yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7]
-
mapPartitionsWithIndex
(f, preservesPartitioning=False)[source]¶ map partitions with index
Parameters: f – map function for (index, partition) Return type: RDD Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([9, 8, 7, 6, 5, 4], 3) >>> def f(splitIndex, iterator): ... yield splitIndex >>> rdd.mapPartitionsWithIndex(f).sum() 3
-
mapValues
(f)[source]¶ map values in a pair dataset
Parameters: f – map function for values Return type: RDD
-
max
()[source]¶ returns the maximum element
Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3, 4, 3, 2], 2).max() == 4 True
-
mean
()[source]¶ returns the mean of this dataset
Example:
>>> from pysparkling import Context >>> Context().parallelize([0, 4, 7, 4, 10]).mean() 5.0
-
partitionBy
(numPartitions, partitionFunc=None)[source]¶ Return a partitioned copy of this key-value RDD.
Parameters: - numPartitions (int) – Number of partitions.
- partitionFunc (function) – Partition function.
Return type: Example where even numbers get assigned to partition 0 and odd numbers to partition 1:
>>> import pysparkling >>> sc = pysparkling.Context() >>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1) >>> keyvalue_rdd = rdd.map(lambda x: (x, x)) >>> keyvalue_rdd.partitionBy(2).keys().collect() [2, 8, 1, 3, 7, 5]
-
persist
(storageLevel=None)[source]¶ Cache the results of computed partitions.
Parameters: storageLevel – Not used.
-
pipe
(command, env=None)[source]¶ Run a command with the elements in the dataset as argument.
Parameters: - command – Command line command to run.
- env (dict) – environment variables
Return type: Warning
Unsafe for untrusted data.
Example:
>>> from pysparkling import Context >>> piped = Context().parallelize(['0', 'hello', 'world']).pipe('echo') >>> b'hello\n' in piped.collect() True
-
randomSplit
(weights, seed=None)[source]¶ Split the RDD into a few RDDs according to the given weights.
Parameters: Returns: a list of RDDs
Return type: list
Note
Creating the new RDDs is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize(range(500)) >>> rdd1, rdd2 = rdd.randomSplit([2, 3], seed=42) >>> (rdd1.count(), rdd2.count()) (199, 301)
-
reduce
(f)[source]¶ reduce
Parameters: f – A commutative and associative binary operator. Example:
>>> from pysparkling import Context >>> Context().parallelize([0, 4, 7, 4, 10]).reduce(lambda a, b: a+b) 25
-
reduceByKey
(f)[source]¶ reduce by key
Parameters: f – A commutative and associative binary operator. Return type: RDD Note
This operation includes a
pysparkling.RDD.groupByKey()
which is a local operation.Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize([(0, 1), (1, 1), (1, 3)]) >>> rdd.reduceByKey(lambda a, b: a+b).collect() [(0, 1), (1, 4)]
-
repartition
(numPartitions)[source]¶ repartition
Parameters: numPartitions (int) – Number of partitions in new RDD. Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
-
repartitionAndSortWithinPartitions
(numPartitions=None, partitionFunc=None, ascending=True, keyfunc=None)[source]¶ Repartition and sort within each partition.
Parameters: - numPartitions (int) – Number of partitions in new RDD.
- partitionFunc – function that partitions
- ascending – Default is True.
- keyfunc – Returns the value that will be sorted.
Return type: Example where even numbers are assigned to partition 0 and odd numbers to partition 1 and then the partitions are sorted individually:
>>> import pysparkling >>> sc = pysparkling.Context() >>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1) >>> kv_rdd = rdd.map(lambda x: (x, x)) >>> processed = kv_rdd.repartitionAndSortWithinPartitions(2) >>> processed.keys().collect() [2, 8, 1, 3, 5, 7]
-
rightOuterJoin
(other, numPartitions=None)[source]¶ right outer join
Parameters: Return type: Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> rdd1 = sc.parallelize([(0, 1), (1, 1)]) >>> rdd2 = sc.parallelize([(2, 1), (1, 3)]) >>> sorted(rdd1.rightOuterJoin(rdd2).collect()) [(1, (1, 3)), (2, (None, 1))]
-
sample
(withReplacement, fraction, seed=None)[source]¶ randomly sample
Parameters: Return type: Sampling without replacement uses Bernoulli sampling and
fraction
is the probability that an element is sampled. Sampling with replacement uses Poisson sampling wherefraction
is the expectation.Example:
>>> from pysparkling import Context >>> rdd = Context().parallelize(range(1000)) >>> sampled = rdd.sample(False, 0.1, seed=5).collect() >>> len(sampled) 115 >>> sampled_with_replacement = rdd.sample(True, 5.0, seed=5).collect() >>> len(sampled_with_replacement) in (5067, 5111) # w/o, w/ numpy True
-
sampleByKey
(withReplacement, fractions, seed=None)[source]¶ randomly sample by key
Parameters: Return type: Sampling without replacement uses Bernoulli sampling and
fraction
is the probability that an element is sampled. Sampling with replacement uses Poisson sampling wherefraction
is the expectation.Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> fractions = {"a": 0.2, "b": 0.1} >>> rdd = (sc ... .parallelize(fractions.keys()) ... .cartesian(sc.parallelize(range(0, 1000)))) >>> sample = (rdd ... .sampleByKey(False, fractions, 2) ... .groupByKey().collectAsMap()) >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150 True >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0 True >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0 True
-
sampleStdev
()[source]¶ sample standard deviation
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3]).sampleStdev() 1.0
-
sampleVariance
()[source]¶ sample variance
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([1, 2, 3]).sampleVariance() 1.0
-
saveAsPickleFile
(path, batchSize=10)[source]¶ save as pickle file
Returns: self
Return type: RDD Warning
The output of this function is incompatible with the PySpark output as there is no pure Python way to write Sequence files.
Example:
>>> from pysparkling import Context >>> from tempfile import NamedTemporaryFile >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() >>> d = ['hello', 'world', 1, 2] >>> rdd = Context().parallelize(d).saveAsPickleFile(tmpFile.name) >>> 'hello' in Context().pickleFile(tmpFile.name).collect() True
-
saveAsTextFile
(path, compressionCodecClass=None)[source]¶ save as text file
If the RDD has many partitions, the contents will be stored directly in the given path. If the RDD has more partitions, the data of the partitions are stored in individual files under
path/part-00000
and so on and once all partitions are written, the filepath/_SUCCESS
is written last.Parameters: - path – Destination of the text file.
- compressionCodecClass – Not used.
Returns: self
Return type:
-
sortBy
(keyfunc, ascending=True, numPartitions=None)[source]¶ sort by keyfunc
Parameters: - keyfunc – Returns the value that will be sorted.
- ascending – Default is True.
- numPartitions (int) – Default is None. None means the output will have the same number of partitions as the input.
Return type: Note
Sorting is currently implemented as a local operation.
Examples:
>>> from pysparkling import Context >>> rdd = Context().parallelize([5, 1, 2, 3]) >>> rdd.sortBy(lambda x: x).collect() [1, 2, 3, 5]
>>> from pysparkling import Context >>> rdd = Context().parallelize([1, 5, 2, 3]) >>> rdd.sortBy(lambda x: x, ascending=False).collect() [5, 3, 2, 1]
-
sortByKey
(ascending=True, numPartitions=None, keyfunc=<operator.itemgetter object>)[source]¶ sort by key
Parameters: - ascending – Default is True.
- numPartitions (int) – Default is None. None means the output will have the same number of partitions as the input.
- keyfunc – Returns the value that will be sorted.
Return type: Note
Sorting is currently implemented as a local operation.
Examples:
>>> from pysparkling import Context >>> rdd = Context().parallelize( ... [(5, 'a'), (1, 'b'), (2, 'c'), (3, 'd')] ... ) >>> rdd.sortByKey().collect()[0][1] == 'b' True
>>> from pysparkling import Context >>> rdd = Context().parallelize( ... [(1, 'b'), (5, 'a'), (2, 'c'), (3, 'd')] ... ) >>> rdd.sortByKey(ascending=False).collect()[0][1] == 'a' True
-
stats
()[source]¶ stats
Return type: StatCounter Example:
>>> from pysparkling import Context >>> d = [1, 4, 9, 16, 25, 36] >>> s = Context().parallelize(d, 3).stats() >>> sum(d)/len(d) == s.mean() True
-
stdev
()[source]¶ standard deviation
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([1.5, 2.5]).stdev() 0.5
-
subtract
(other, numPartitions=None)[source]¶ subtract
Parameters: Return type: Example:
>>> from pysparkling import Context >>> rdd1 = Context().parallelize([(0, 1), (1, 1)]) >>> rdd2 = Context().parallelize([(1, 1), (1, 3)]) >>> rdd1.subtract(rdd2).collect() [(0, 1)]
-
sum
()[source]¶ sum of all the elements
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([0, 4, 7, 4, 10]).sum() 25
-
take
(n)[source]¶ Take n elements and return them in a list.
Only evaluates the partitions that are necessary to return n elements.
Parameters: n (int) – Number of elements to return. Return type: list Example:
>>> from pysparkling import Context >>> Context().parallelize([4, 7, 2]).take(2) [4, 7]
Another example where only the first two partitions only are computed (check the debug logs):
>>> from pysparkling import Context >>> Context().parallelize([4, 7, 2], 3).take(2) [4, 7]
-
takeSample
(n)[source]¶ take sample
Assumes samples are evenly distributed between partitions. Only evaluates the partitions that are necessary to return n elements.
Parameters: n (int) – The number of elements to sample. Return type: list Example:
>>> from pysparkling import Context >>> Context().parallelize([4, 7, 2]).takeSample(1)[0] in [4, 7, 2] True
Another example where only one partition is computed (check the debug logs):
>>> from pysparkling import Context >>> d = [4, 9, 7, 3, 2, 5] >>> Context().parallelize(d, 3).takeSample(1)[0] in d True
-
toLocalIterator
()[source]¶ Returns an iterator over the dataset.
Example:
>>> from pysparkling import Context >>> sum(Context().parallelize([4, 9, 7, 3, 2, 5], 3).toLocalIterator()) 30
-
top
(num, key=None)[source]¶ Top N elements in descending order.
Parameters: - num (int) – number of elements
- key – optional key function
Return type: list
Example:
>>> from pysparkling import Context >>> r = Context().parallelize([4, 9, 7, 3, 2, 5], 3) >>> r.top(2) [9, 7]
-
union
(other)[source]¶ union
Parameters: other (RDD) – The other RDD for the union. Return type: RDD Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3) >>> my_rdd.union(my_rdd).count() 12
-
variance
()[source]¶ The variance of the dataset.
Return type: float Example:
>>> from pysparkling import Context >>> Context().parallelize([1.5, 2.5]).variance() 0.25
-
zip
(other)[source]¶ zip
Parameters: other (RDD) – Other dataset to zip with. Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3) >>> my_rdd.zip(my_rdd).collect() [(4, 4), (9, 9), (7, 7), (3, 3), (2, 2), (5, 5)]
-
zipWithIndex
()[source]¶ Returns pairs of an original element and its index.
Return type: RDD Note
Creating the new RDD is currently implemented as a local operation.
Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3) >>> my_rdd.zipWithIndex().collect() [(4, 0), (9, 1), (7, 2), (3, 3), (2, 4), (5, 5)]
-
zipWithUniqueId
()[source]¶ Zip every entry with a unique index.
This is a fast operation.
Return type: RDD Example:
>>> from pysparkling import Context >>> my_rdd = Context().parallelize([423, 234, 986, 5, 345], 3) >>> my_rdd.zipWithUniqueId().collect() [(423, 0), (234, 1), (986, 4), (5, 2), (345, 5)]
- partitions (list) – A list of instances of
Context¶
A Context
describes the setup. Instantiating a Context with the default
arguments using Context()
is the most lightweight setup. All data is just
in the local thread and is never serialized or deserialized.
If you want to process the data in parallel, you can use the multiprocessing
module. Given the limitations of the default pickle
serializer, you can
specify to serialize all methods with cloudpickle
instead. For example,
a common instantiation with multiprocessing
looks like this:
c = Context(
multiprocessing.Pool(4),
serializer=cloudpickle.dumps,
deserializer=pickle.loads,
)
This assumes that your data is serializable with pickle
which is generally
faster. You can also specify a custom serializer/deserializer for data.
-
class
pysparkling.
Context
(pool=None, serializer=None, deserializer=None, data_serializer=None, data_deserializer=None, max_retries=3, retry_wait=0.0, cache_manager=None)[source]¶ Context object similar to a Spark Context.
The variable
_stats
contains measured timing information about data and function (de)serialization and workload execution to benchmark your jobs.Parameters: - pool – An instance with a
map(func, iterable)
method. - serializer – Serializer for functions. Examples are
pickle.dumps
anddill.dumps
. - deserializer – Deserializer for functions. Examples are
pickle.loads
anddill.loads
. - data_serializer – Serializer for the data.
- data_deserializer – Deserializer for the data.
- max_retries (int) – maximum number a partition is retried
- retry_wait (float) – seconds to wait between retries
- cache_manager – custom cache manager (like
TimedCacheManager
)
-
parallelize
(x, numPartitions=None)[source]¶ Parallelize x.
Parameters: - x – An iterable (e.g. a list) that represents the data.
- numPartitions (int) – The number of partitions the data should be split into. A partition is a unit of data that is processed at a time.
Return type:
-
pickleFile
(name, minPartitions=None)[source]¶ Read a pickle file.
Reads files created with
RDD.saveAsPickleFile()
into an RDD.Parameters: - name – Location of a file. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type: Example with a serialized list:
>>> import pickle >>> from pysparkling import Context >>> from tempfile import NamedTemporaryFile >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() >>> with open(tmpFile.name, 'wb') as f: ... pickle.dump(['hello', 'world'], f) >>> Context().pickleFile(tmpFile.name).collect()[0] == 'hello' True
- name – Location of a file. Can include schemes like
-
runJob
(rdd, func, partitions=None, allowLocal=False, resultHandler=None)[source]¶ This function is used by methods in the RDD.
Note that the maps are only inside generators and the resultHandler needs to take care of executing the ones that it needs. In other words, if you need everything to be executed, the resultHandler needs to be at least
lambda x: list(x)
to trigger execution of the generators.Parameters: - func – Map function with signature func(TaskContext, Iterator over elements).
- partitions – List of partitions that are involved.
None
means the map job is applied to all partitions. - allowLocal – Allows local execution.
- resultHandler – Process the result from the maps.
Returns: Result of resultHandler.
Return type: list
-
binaryFiles
(path, minPartitions=None)[source]¶ Read a binary file into an RDD.
Parameters: - path – Location of a file. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type: Warning
Not part of PySpark API.
Setting up examples:
>>> import os, pysparkling >>> from backports import tempfile >>> sc = pysparkling.Context() >>> decode = lambda bstring: bstring.decode()
Example with whole file:
>>> with tempfile.TemporaryDirectory() as tmp: ... with open(os.path.join(tmp, 'test.b'), 'wb') as f: ... _ = f.write(b'bellobello') ... sc.binaryFiles(tmp+'*').mapValues(decode).collect() [('...', 'bellobello')]
- path – Location of a file. Can include schemes like
-
binaryRecords
(path, recordLength=None)[source]¶ Read a binary file into an RDD.
Parameters: - path – Location of a file. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - recordLength – If
None
every file is a record,int
means fixed length records and astring
is used as a format string tostruct
to read the length of variable length binary records.
Return type: Warning
Only an
int
recordLength is part of the PySpark API.Setting up examples:
>>> import os, pysparkling >>> from backports import tempfile >>> sc = pysparkling.Context() >>> decode = lambda bstring: bstring.decode()
Example with whole file:
>>> with tempfile.TemporaryDirectory() as tmp: ... with open(os.path.join(tmp, 'test.b'), 'wb') as f: ... _ = f.write(b'bellobello') ... sc.binaryRecords(tmp+'*').map(decode).collect() ['bellobello']
Example with fixed length records:
>>> with tempfile.TemporaryDirectory() as tmp: ... with open(os.path.join(tmp, 'test.b'), 'wb') as f: ... _ = f.write(b'bellobello') ... sc.binaryRecords(tmp+'*', recordLength=5).map(decode).collect() ['bello', 'bello']
Example with variable length records:
>>> with tempfile.TemporaryDirectory() as tmp: ... with open(os.path.join(tmp, 'test.b'), 'wb') as f: ... _ = f.write(struct.pack('<I', 5) + b'bello') ... _ = f.write(struct.pack('<I', 10) + b'bellobello') ... (sc.binaryRecords(tmp+'*', recordLength='<I') ... .map(decode).collect()) ['bello', 'bellobello']
- path – Location of a file. Can include schemes like
-
textFile
(filename, minPartitions=None, use_unicode=True)[source]¶ Read a text file into an RDD.
Parameters: - filename – Location of a file. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
- use_unicode – (optional, default=True)
Use
utf8
ifTrue
andascii
ifFalse
.
Return type: - filename – Location of a file. Can include schemes like
-
wholeTextFiles
(path, minPartitions=None, use_unicode=True)[source]¶ Read text files into an RDD of pairs of file name and file content.
Parameters: - path – Location of the files. Can include schemes like
http://
,s3://
andfile://
, wildcard characters?
and*
and multiple expressions separated by,
. - minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
- use_unicode – (optional, default=True)
Use
utf8
ifTrue
andascii
ifFalse
.
Return type: - path – Location of the files. Can include schemes like
- pool – An instance with a
Streaming¶
Warning
This is a new addition to the API (March 2017) that should only be used with care.
StreamingContext¶
-
class
pysparkling.streaming.
StreamingContext
(sparkContext, batchDuration=None)[source]¶ Stream processing.
Parameters: - sparkContext (pysparkling.Context) – A pysparkling.Context.
- batchDuration (float) – Duration in seconds per batch.
-
sparkContext
¶ Return context of this StreamingContext.
-
awaitTermination
(timeout=None)[source]¶ Wait for context to stop.
Parameters: timeout (float) – in seconds
-
awaitTerminationOrTimeout
(timeout)[source]¶ Provided for compatibility. Same as
awaitTermination()
here.
-
binaryRecordsStream
(directory, recordLength=None, process_all=False)[source]¶ Monitor a directory and process all binary files.
File names starting with
.
are ignored.Parameters: Return type: Warning
Only
int
recordLength
are supported in PySpark API. Theprocess_all
parameter does not exist in the PySpark API.
-
queueStream
(rdds, oneAtATime=True, default=None)[source]¶ Create stream iterable over RDDs.
Parameters: - rdds – Iterable over RDDs or lists.
- oneAtATime – Process one at a time or all.
- default – If no more RDDs in
rdds
, return this. Can be None.
Return type: Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[4], [2], [7]]) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [4] [2] [7]
Example testing the default value:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[4], [2]], default=['placeholder']) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [4] [2] ['placeholder']
-
socketBinaryStream
(hostname, port, length)[source]¶ Create a TCP socket server for binary input.
Warning
This is not part of the PySpark API.
Parameters: - hostname (string) – Hostname of TCP server.
- port (int) – Port of TCP server.
- length –
Message length. Length in bytes or a format string for
struct.unpack()
.For variable length messages where the message length is sent right before the message itself,
length
is a format string that can be passed tostruct.unpack()
. For example, uselength='<I'
for a little-endian (standard on x86) 32-bit unsigned int.
Return type:
-
stop
(stopSparkContext=True, stopGraceFully=False)[source]¶ Stop processing streams.
Parameters: - stopSparkContext – stop the SparkContext (NOT IMPLEMENTED)
- stopGracefully – stop gracefully (NOT IMPLEMENTED)
-
textFileStream
(directory, process_all=False)[source]¶ Monitor a directory and process all text files.
File names starting with
.
are ignored.Parameters: Return type: Warning
The
process_all
parameter does not exist in the PySpark API.
-
fileTextStream
(directory, process_all=False)¶ Alias of
textFileStream()
.
-
fileBinaryStream
(directory, recordLength=None, process_all=False)¶ Alias of
binaryRecordsStream()
.
DStream¶
-
class
pysparkling.streaming.
DStream
(jdstream, ssc, jrdd_deserializer=None)[source]¶ A discrete stream of RDDs.
Usually a DStream is created by a
pysparkling.streaming.StreamingContext
method likepysparkling.streaming.StreamingContext.queueStream()
and then operated on with the methods below.Parameters: - jdstream – previous stream
- ssc (StreamingContext) – the streaming context
- jrdd_deserializer – a deserializer callable
-
cogroup
(other, numPartitions=None)[source]¶ Apply cogroup to RDDs of this and other DStream.
Parameters: other (DStream) – another DStream Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('b', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> s1.cogroup(s2).foreachRDD(lambda rdd: print(sorted(rdd.collect()))) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', [[4], [1]]), ('b', [[2], [3]])] [('c', [[7], [8]])]
-
context
()[source]¶ Return the StreamContext of this stream.
Return type: StreamingContext
-
count
()[source]¶ Count elements per RDD.
Creates a new RDD stream where each RDD has a single entry that is the count of the elements.
Return type: DStream
-
countByValue
()[source]¶ Apply countByValue to every RDD.abs
Return type: DStream Warning
Implemented as a local operation.
Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[1, 1, 5, 5, 5, 2]]) ... .countByValue() ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.15) [(1, 2), (2, 1), (5, 3)]
-
countByWindow
(windowDuration, slideDuration=None)[source]¶ Applies count() after window().
Parameters: Return type: Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[1, 1, 5], [5, 5, 2, 4], [1, 2]]) ... .countByWindow(0.2) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [3] [7] [6]
-
flatMap
(f, preservesPartitioning=False)[source]¶ Apply function f and flatten.
Parameters: f – mapping function Return type: DStream
-
flatMapValues
(f)[source]¶ Apply f to each value of a key-value pair.
Parameters: f – map function Return type: DStream
-
fullOuterJoin
(other, numPartitions=None)[source]¶ Apply fullOuterJoin to each pair of RDDs.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('b', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> ( ... s1.fullOuterJoin(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (4, 1)), ('b', (2, 3))] [('c', (7, 8))]
Example with repeated keys:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('a', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('b', 1)], [('c', 8)]]) >>> ( ... s1.fullOuterJoin(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (2, None)), ('a', (4, None)), ('b', (None, 1))] [('c', (7, 8))]
-
join
(other, numPartitions=None)[source]¶ Apply join to each pair of RDDs.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> ( ... s1.join(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (4, 1))] [('c', (7, 8))]
-
leftOuterJoin
(other, numPartitions=None)[source]¶ Apply leftOuterJoin to each pair of RDDs.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> ( ... s1.leftOuterJoin(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (4, 1)), ('e', (2, None))] [('c', (7, 8))]
-
map
(f, preservesPartitioning=False)[source]¶ Apply function f
Parameters: f – mapping function Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[4], [2], [7]]) ... .map(lambda e: e + 1) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [5] [3] [8]
-
mapPartitions
(f, preservesPartitioning=False)[source]¶ Map partitions.
Parameters: f – mapping function Return type: DStream
-
mapPartitionsWithIndex
(f, preservesPartitioning=False)[source]¶ Apply a map function that takes an index and the data.
Map partitions with a function that takes the partition index and an iterator over the partition data as arguments.
Parameters: f – mapping function Return type: DStream
-
mapValues
(f)[source]¶ Apply
f
to every element.Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([[('a', 4)], [('b', 2)], [('c', 7)]]) ... .mapValues(lambda e: e + 1) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [('a', 5)] [('b', 3)] [('c', 8)]
-
pprint
(num=10)[source]¶ Print the first
num
elements of each RDD.Parameters: num (int) – Set number of elements to be printed.
-
reduce
(func)[source]¶ Return a new DStream where each RDD was reduced with
func
.Return type: DStream
-
reduceByKey
(func, numPartitions=None)[source]¶ Apply reduceByKey to every RDD.
Parameters: - func – reduce function to apply
- numPartitions (int) – number of partitions
Return type:
-
repartition
(numPartitions)[source]¶ Repartition every RDD.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> ( ... ssc ... .queueStream([['hello', 'world']]) ... .repartition(2) ... .foreachRDD(lambda rdd: print(len(rdd.partitions()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) 2 0
-
rightOuterJoin
(other, numPartitions=None)[source]¶ Apply rightOuterJoin to each pair of RDDs.
Return type: DStream Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]]) >>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]]) >>> ( ... s1.rightOuterJoin(s2) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.25) [('a', (4, 1)), ('b', (None, 3))] [('c', (7, 8))]
-
saveAsTextFiles
(prefix, suffix=None)[source]¶ Save every RDD as a text file (or sets of text files).
Parameters: Example:
>>> from backports import tempfile >>> import os, pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> with tempfile.TemporaryDirectory() as tmp_dir: ... ( ... ssc.queueStream([['hello', 'world'], [1, 2]]) ... .saveAsTextFiles(os.path.join(tmp_dir, 'textout')) ... ) ... ssc.start() ... ssc.awaitTermination(0.25) ... result = sc.textFile(tmp_dir + '*').collect() >>> result ['hello', 'world', '1', '2']
-
slice
(begin, end)[source]¶ Filter RDDs to between begin and end.
Parameters: - begin (datetime.datetime|int) – datetiem or unix timestamp
- end (datetime.datetime|int) – datetiem or unix timestamp
Return type:
-
transform
(func)[source]¶ Return a new DStream where each RDD is transformed by
f
.Parameters: f – Function that transforms an RDD. Return type: DStream
-
transformWith
(func, other, keepSerializer=False)[source]¶ Return a new DStream where each RDD is transformed by
f
.Parameters: f – transformation function Return type: DStream The transformation function can have arguments
(time, rdd_a, rdd_b)
or(rdd_a, rdd_b)
.
-
union
(other)[source]¶ Union of two DStreams.
Parameters: other (DStream) – Another DStream. Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1) >>> odd = ssc.queueStream([[1], [3], [5]]) >>> even = ssc.queueStream([[2], [4], [6]]) >>> ( ... odd.union(even) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.35) [1, 2] [3, 4] [5, 6]
-
updateStateByKey
(func)[source]¶ Process with state.
Parameters: func – Evaluated per key. Takes list of input_values and a state. Return type: DStream This example shows how to return the latest value per key:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2) >>> ( ... ssc ... .queueStream([[('a', 1), ('b', 3)], [('a', 2), ('c', 4)]]) ... .updateStateByKey(lambda input_values, state: ... state ... if not input_values ... else input_values[-1]) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.5) [('a', 1), ('b', 3)] [('a', 2), ('b', 3), ('c', 4)]
This example counts values per key:
>>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2) >>> ( ... ssc ... .queueStream([[('a', 1)], [('a', 2), ('b', 4), ('b', 3)]]) ... .updateStateByKey(lambda input_values, state: ... (state if state is not None else 0) + ... sum(input_values)) ... .foreachRDD(lambda rdd: print(sorted(rdd.collect()))) ... ) >>> ssc.start() >>> ssc.awaitTermination(0.5) [('a', 1)] [('a', 3), ('b', 7)]
-
window
(windowDuration, slideDuration=None)[source]¶ Windowed RDD.
Parameters: Return type: Example:
>>> import pysparkling >>> sc = pysparkling.Context() >>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2) >>> ( ... ssc ... .queueStream([[1], [2], [3], [4], [5], [6]]) ... .window(0.6) ... .foreachRDD(lambda rdd: print(rdd.collect())) ... ) >>> ssc.start() >>> ssc.awaitTermination(1.3) [1] [1, 2] [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 6]
fileio¶
The functionality provided by this module is used in Context.textFile()
for reading and in RDD.saveAsTextFile()
for writing.
You can use this submodule with File.dump()
, File.load()
and
File.exists()
to read, write and check for existance of a file.
All methods transparently handle various schemas (for example http://
,
s3://
and file://
) and compression/decompression of .gz
and
.bz2
files (among others).
-
class
pysparkling.fileio.
File
(file_name)[source]¶ File object.
Parameters: file_name – Any file name. -
static
resolve_filenames
(all_expr)[source]¶ resolve expression for a filename
Parameters: all_expr – A comma separated list of expressions. The expressions can contain the wildcard characters *
and?
. It also resolves Spark datasets to the paths of the individual partitions (i.e.my_data
gets resolved to[my_data/part-00000, my_data/part-00001]
).Returns: A list of file names. Return type: list
-
load
()[source]¶ Load the data from a file.
Return type: io.BytesIO
-
static
-
class
pysparkling.fileio.
TextFile
(file_name)[source]¶ Derived from
File
.Parameters: file_name – Any text file name.
File System¶
-
class
pysparkling.fileio.fs.
FileSystem
(file_name)[source]¶ Interface class for the file system.
Parameters: file_name (str) – File name. -
static
resolve_filenames
(expr)[source]¶ Resolve the given glob-like expression to filenames.
Return type: list
-
load
()[source]¶ Load a file to a stream.
Return type: io.BytesIO
-
load_text
(encoding='utf8', encoding_errors='ignore')[source]¶ Load a file to a stream.
Parameters: Return type:
-
dump
(stream)[source]¶ Dump a stream to a file.
Parameters: stream (io.BytesIO) – Input tream.
-
make_public
(recursive=False)[source]¶ Make the file public (only on some file systems).
Parameters: recursive (bool) – Recurse. Return type: FileSystem
-
static
-
class
pysparkling.fileio.fs.
Local
(file_name)[source]¶ FileSystem
implementation for the local file system.
-
class
pysparkling.fileio.fs.
GS
(file_name)[source]¶ FileSystem
implementation for Google Storage.Paths are of the form
gs://bucket_name/file_path
orgs://project_name:bucket_name/file_path
.-
mime_type
= 'text/plain'¶ Default mime type.
-
project_name
= None¶ Set a default project name.
-
-
class
pysparkling.fileio.fs.
Hdfs
(file_name)[source]¶ FileSystem
implementation for HDFS.
-
class
pysparkling.fileio.fs.
Http
(file_name)[source]¶ FileSystem
implementation for HTTP.
-
class
pysparkling.fileio.fs.
S3
(file_name)[source]¶ FileSystem
implementation for S3.Use environment variables
AWS_SECRET_ACCESS_KEY
andAWS_ACCESS_KEY_ID
for auth and use file paths of the forms3://bucket_name/filename.txt
.-
connection_kwargs
= {}¶ Keyword arguments for new connections. Example: set to
{'anon': True}
for anonymous connections.
-
Codec¶
-
class
pysparkling.fileio.codec.
Codec
[source]¶ Codec.
-
compress
(stream)[source]¶ Compress.
Parameters: stream (io.BytesIO) – Uncompressed input stream. Return type: io.BytesIO
-
decompress
(stream)[source]¶ Decompress.
Parameters: stream (io.BytesIO) – Compressed input stream. Return type: io.BytesIO
-
-
class
pysparkling.fileio.codec.
Lzma
[source]¶ Implementation of
Codec
for lzma compression.Needs Python >= 3.3.
Parallelization¶
Pysparkling supports parallelizations on the local machine and across clusters of computers.
Processes and Threads¶
Single machine parallelization with
concurrent.futures.ThreadPoolExecutor
,
concurrent.futures.ProcessPoolExecutor
or
multiprocessing.Pool
is supported. Use cloudpickle
instead of pickle
for
serialization to support lambda functions (and more) for data transformations.
import cloudpickle
import concurrent
import pysparkling
sc = pysparkling.Context(
pool=concurrent.futures.ProcessPoolExecutor(4),
serializer=cloudpickle.dumps,
deserializer=pickle.loads,
)
Experimental¶
The following are experimental notes. Most of them don’t even contain examples how to make use of these techniques with pysparkling.
ipcluster and IPython.parallel¶
Local test setup:
ipcluster start --n=2
from IPython.parallel import Client
c = Client()
print(c[:].map(lambda _: 'hello world', range(2)).get())
which should print ['hello world', 'hello world']
.
To run on a cluster, create a profile:
ipython profile create --parallel --profile=smallcluster
# start controller:
# Creates ~/.ipython/profile_smallcluster/security/ipcontroller-engine.json
# which is used by the engines to identify the location of this controller.
# This is the local-only IP address. Substitute with the machines IP
# address so that the engines can find it.
ipcontroller --ip=127.0.0.1 --port=7123 --profile=smallcluster
# start engines (assuming they have access to the
# ipcontroller-engine.json file)
ipengine --profile=smallcluster
Test it in Python:
from IPython.parallel import Client
c = Client(profile='smallcluster')
print(c[:].map(lambda _: 'hello world', range(2)).get())
If you don’t want to start the engines manually, ipcluster
comes with
“Launchers” that can start them for you:
https://ipython.org/ipython-doc/dev/parallel/parallel_process.html#using-ipcluster-in-ssh-mode
StarCluster¶
Setting up StarCluster was an experiment. However it does not integrate well
with the rest of our EC2 infrastructure, so we switched to a Chef based setup
where we use ipcluster
directly. A blocker was that the number of engines
per node is not configurable and we have many map jobs that wait on external
responses.
Setup
# install
pip install starcluster
# create configuration
starcluster help # choose the option to create a sample config file
# add your user id, aws_access_key_id and aws_secret_access_key to config
# create an ssh key (this creates a new key just for starcluster)
# and registers it with AWS
starcluster createkey starclusterkey -o ~/.ssh/starclusterkey.rsa
# add this key to config:
[key starclusterkey]
KEY_LOCATION=~/.ssh/starclusterkey.rsa
# and use this key in the cluster setup:
KEYNAME = starclusterkey
# disable the queue, Sun Grid Engine
# (unnecessary for pysparkling and takes time during setup)
DISABLE_QUEUE=True
# to enable IPython parallel support, uncomment these lines in config:
[plugin ipcluster]
SETUP_CLASS = starcluster.plugins.ipcluster.IPCluster
# and make sure you have this line inside the cluster section
[cluster smallcluster]
PLUGINS = ipcluster
# start the cluster
starcluster start smallcluster
# check it has started
starcluster listclusters
Currently use: ami-da180db2
(Ubuntu 14.04 with 100GB EBS) on
m3.medium
instances.
Workarounds:
# this seems to be a dependency that does not get installed
pip install pexpect
# to validate the ssh host, you need to log in once manually, to add it
# to the list of known hosts
starcluster sshmaster smallcluster
In Python, you should now be able to run
from IPython.parallel import Client
# the exact command is printed after the cluster started
rc = Client('/Users/sven/.starcluster/ipcluster/SecurityGroup:@sc-smallcluster-us-east-1.json',
sshkey='/Users/sven/.ssh/starclusterkey.rsa', packer='pickle')
view = rc[:]
results = view.map(lambda x: x**30, range(8))
print results.get()
which is also in tests/starcluster_simple.py
.
Install your own software that is not on pypi:
pip install wheel
python setup.py bdist_wheel # add --universal for Python2 and 3 packages
starcluster put smallcluster dist/your_package_name.whl /home/sgeadmin/your_package_name.whl
# ssh into remote machine
starcluster sshmaster smallcluster
> pip install --upgrade pip
> pip install wheel
> pip2.7 install /home/sgeadmin/your_package_name.whl
Development¶
Fork the Github repository and apply your changes in a feature branch. To run pysparkling’s unit tests:
# install
pip install -e .[hdfs,performance,streaming,tests]
flake8 --install-hook
# run linting and test
flake8
PERFORMANCE=1 nosetests -vv
Don’t run python setup.py test
as this will
not execute the doctests. When all tests pass, create a Pull Request on GitHub.
Please also update HISTORY.rst
with short description of your change.
To preview the docs locally, install the extra dependencies with
pip install -r docs/requirements.txt
, and then cd into docs/sphinx
,
run make html
and open docs/sphincs/_build/html/index.html
.
Please also try not to add derivative work from other projects. If you do, incorporate proper handling of external licenses in your Pull Request.