https://raw.githubusercontent.com/svenkreiss/pysparkling/master/logo/logo-w100.png

pysparkling

Pysparkling provides a faster, more responsive way to develop programs for PySpark. It enables code intended for Spark applications to execute entirely in Python, without incurring the overhead of initializing and passing data through the JVM and Hadoop. The focus is on having a lightweight and fast implementation for small datasets at the expense of some data resilience features and some parallel processing features.

How does it work? To switch execution of a script from PySpark to pysparkling, have the code initialize a pysparkling Context instead of a SparkContext, and use the pysparkling Context to set up your RDDs. The beauty is you don’t have to change a single line of code after the Context initialization, because pysparkling’s API is (almost) exactly the same as PySpark’s. Since it’s so easy to switch between PySpark and pysparkling, you can choose the right tool for your use case.

When would I use it? Say you are writing a Spark application because you need robust computation on huge datasets, but you also want the same application to provide fast answers on a small dataset. You’re finding Spark is not responsive enough for your needs, but you don’t want to rewrite an entire separate application for the small-answers-fast problem. You’d rather reuse your Spark code but somehow get it to run fast. Pysparkling bypasses the stuff that causes Spark’s long startup times and less responsive feel.

Here are a few areas where pysparkling excels:

  • Small to medium-scale exploratory data analysis
  • Application prototyping
  • Low-latency web deployments
  • Unit tests

Install

pip install pysparkling[s3,hdfs,http,streaming]

Documentation:

https://raw.githubusercontent.com/svenkreiss/pysparkling/master/docs/readthedocs.png

Other links: Github, Issue Tracker, pypi-badge

Features

  • Supports URI schemes s3://, hdfs://, gs://, http:// and file:// for Amazon S3, HDFS, Google Storage, web and local file access. Specify multiple files separated by comma. Resolves * and ? wildcards.
  • Handles .gz, .zip, .lzma, .xz, .bz2, .tar, .tar.gz and .tar.bz2 compressed files. Supports reading of .7z files.
  • Parallelization via multiprocessing.Pool, concurrent.futures.ThreadPoolExecutor or any other Pool-like objects that have a map(func, iterable) method.
  • Plain pysparkling does not have any dependencies (use pip install pysparkling). Some file access methods have optional dependencies: boto for AWS S3, requests for http, hdfs for hdfs

Examples

Some demos are in the notebooks docs/demo.ipynb and docs/iris.ipynb .

Word Count

from pysparkling import Context

counts = (
    Context()
    .textFile('README.rst')
    .map(lambda line: ''.join(ch if ch.isalnum() else ' ' for ch in line))
    .flatMap(lambda line: line.split(' '))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)
print(counts.collect())

which prints a long list of pairs of words and their counts.

Contents

Reading and Writing

This is a collection of best practices or templates for reading and writing various input and output formats.

Batch

Python List

The most direct input and output is from and to a Python list.

import pysparkling

sc = pysparkling.Context()

# reading
rdd = sc.parallelize(['hello', 'world'])

# back to Python list
print(rdd.collect())

# back to an iterator
rdd.toLocalIterator()

ND-JSON

Newline delimited JSON is a text file where every line is its own JSON string.

import json
import pysparkling

sc = pysparkling.Context()

# reading
rdd = (
    sc
    .textFile('input.json')
    .map(json.loads)
)

# writing
(
    rdd
    .map(json.dumps)
    .saveAsTextFile('output.json')
)

CSV

import csv
import io
import pysparkling

sc = pysparkling.Context()

# reading
rdd = (
    sc
    .textFile('input.csv')
    .mapPartitions(csv.reader)
)

# writing
def csv_row(data):
    s = io.StringIO()
    csv.writer(s).writerow(data)
    return s.getvalue()

(
    rdd
    .map(csv_row)
    .saveAsTextFile('output.csv')
)

TensorFlow Records

This example preprocesses example data into a TensorFlow Records file. The second part is a cross check and prints the contents of the tfrecords file.

import pysparkling
import tensorflow as tf

def to_tfrecord(self, xy):
    X, y = xy
    example = tf.train.Example(features=tf.train.Features(feature={
        'X': tf.train.Feature(float_list=tf.train.FloatList(value=X)),
        'y': tf.train.Feature(int64_list=tf.train.Int64List(value=y)),
    }))
    return example.SerializeToString()

# example
X = [1.2, 3.1, 8.7]
y = [2, 5]

# writing
sc = pysparkling.Context()
rdd = (
    sc
    .parallelize([(X, y)])
    .map(to_tfrecord)
)
with tf.python_io.TFRecordWriter('out.tfrecords') as writer:
    for example in rdd.toLocalIterator():
        writer.write(example)

# debugging a tf records file
for serialized_example in tf.python_io.tf_record_iterator('out.tfrecords'):
    example = tf.train.Example()
    example.ParseFromString(serialized_example)
    X = example.features.feature['X'].float_list.value
    y = example.features.feature['y'].int64_list.value
    print(X, y)

Streaming

Python List

import pysparkling

sc = pysparkling.Context()
ssc = pysparkling.streaming.StreamingContext(sc, 1.0)

(
    ssc
    .queueStream([[4], [2], [7]])
    .foreachRDD(lambda rdd: print(rdd.collect()))
)

ssc.start()
ssc.awaitTermination(3.5)

# output:
# [4]
# [2]
# [7]

API

A usual pysparkling session starts with either parallelizing a list with Context.parallelize() or by reading data from a file using Context.textFile(). These two methods return RDD instances that can then be processed.

RDD

class pysparkling.RDD(partitions, ctx)[source]

RDD

In Spark’s original form, RDDs are Resilient, Distributed Datasets. This class reimplements the same interface with the goal of being fast on small data at the cost of being resilient and distributed.

Parameters:
  • partitions (list) – A list of instances of Partition.
  • ctx (Context) – An instance of the applicable Context.
compute(split, task_context)[source]

interface to extend behavior for specific cases

Parameters:split (Partition) – a partition
aggregate(zeroValue, seqOp, combOp)[source]

aggregate

[distributed]

Parameters:
  • zeroValue – The initial value to an aggregation, for example 0 or 0.0 for aggregating int s and float s, but any Python object is possible. Can be None.
  • seqOp – A reference to a function that combines the current state with a new value. In the first iteration, the current state is zeroValue.
  • combOp – A reference to a function that combines outputs of seqOp. In the first iteration, the current state is zeroValue.
Returns:

Output of combOp operations.

Example:

>>> from pysparkling import Context
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> Context().parallelize(
...     [1, 2, 3, 4], 2
... ).aggregate((0, 0), seqOp, combOp)
(10, 4)
aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None)[source]

aggregate by key

Parameters:
  • zeroValue – The initial value to an aggregation, for example 0 or 0.0 for aggregating int s and float s, but any Python object is possible. Can be None.
  • seqFunc – A reference to a function that combines the current state with a new value. In the first iteration, the current state is zeroValue.
  • combFunc – A reference to a function that combines outputs of seqFunc. In the first iteration, the current state is zeroValue.
  • numPartitions (int) – (optional) Not used.
Returns:

An RDD with the output of combOp operations.

Return type:

RDD

Example:

>>> from pysparkling import Context
>>> seqOp = (lambda x, y: x + y)
>>> combOp = (lambda x, y: x + y)
>>> r = Context().parallelize(
...     [('a', 1), ('b', 2), ('a', 3), ('c', 4)]
... ).aggregateByKey(0, seqOp, combOp).collectAsMap()
>>> (r['a'], r['b'])
(4, 2)
cache()[source]

Once a partition is computed, cache the result.

Alias for RDD.persist().

Example:

>>> from pysparkling import Context
>>> from pysparkling import CacheManager
>>>
>>> n_exec = 0
>>>
>>> def _map(e):
...     global n_exec
...     n_exec += 1
...     return e*e
>>>
>>> my_rdd = Context().parallelize([1, 2, 3, 4], 2)
>>> my_rdd = my_rdd.map(_map).cache()
>>>
>>> logging.info('no exec until here')
>>> f = my_rdd.first()
>>> logging.info('available caches in {1}: {0}'.format(
...     CacheManager.singleton().stored_idents(),
...     CacheManager.singleton(),
... ))
>>>
>>> logging.info('executed map on first partition only so far')
>>> a = my_rdd.collect()
>>> logging.info('available caches in {1}: {0}'.format(
...     CacheManager.singleton().stored_idents(),
...     CacheManager.singleton(),
... ))
>>>
>>> logging.info('now _map() was executed on all partitions and should'
...              'not be executed again')
>>> logging.info('available caches in {1}: {0}'.format(
...     CacheManager.singleton().stored_idents(),
...     CacheManager.singleton(),
... ))
>>> (my_rdd.collect(), n_exec)
([1, 4, 9, 16], 4)
cartesian(other)[source]

cartesian product of this RDD with other

Parameters:other (RDD) – Another RDD.
Return type:RDD

Note

This is currently implemented as a local operation requiring all data to be pulled on one machine.

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]
coalesce(numPartitions, shuffle=False)[source]

coalesce

Parameters:
  • numPartitions (int) – Number of partitions in the resulting RDD.
  • shuffle – (optional) Not used.
Return type:

RDD

Note

This is currently implemented as a local operation requiring all data to be pulled on one machine.

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).coalesce(1).getNumPartitions()
1
cogroup(other, numPartitions=None)[source]

Groups keys from both RDDs together. Values are nested iterators.

Parameters:
  • other (RDD) – The other RDD.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Example:

>>> from pysparkling import Context
>>> c = Context()
>>> a = c.parallelize([('house', 1), ('tree', 2)])
>>> b = c.parallelize([('house', 3)])
>>>
>>> [(k, sorted(list([list(vv) for vv in v])))
...  for k, v in sorted(a.cogroup(b).collect())
... ]
[('house', [[1], [3]]), ('tree', [[], [2]])]
collect()[source]

returns the entire dataset as a list

Return type:list

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3]).collect()
[1, 2, 3]
collectAsMap()[source]

returns a dictionary for a pair dataset

Return type:dict

Example:

>>> from pysparkling import Context
>>> d = Context().parallelize([('a', 1), ('b', 2)]).collectAsMap()
>>> (d['a'], d['b'])
(1, 2)
count()[source]

number of entries in this dataset

Return type:int

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3], 2).count()
3
countApprox()[source]

same as RDD.count()

Return type:int
countByKey()[source]

returns a dict containing the count for every key

Return type:dict

Example:

>>> from pysparkling import Context
>>> Context().parallelize(
...     [('a', 1), ('b', 2), ('b', 2)]
... ).countByKey()['b']
2
countByValue()[source]

returns a dict containing the count for every value

Return type:dict

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 2, 4, 1]).countByValue()[2]
2
distinct(numPartitions=None)[source]

returns only distinct elements

Parameters:numPartitions (int) – Number of partitions in the resulting RDD.
Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 2, 4, 1]).distinct().count()
3
filter(f)[source]

filter elements

Parameters:f – a function that decides whether to keep an element
Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize(
...     [1, 2, 2, 4, 1, 3, 5, 9], 3,
... ).filter(lambda x: x % 2 == 0).collect()
[2, 2, 4]
first()[source]

returns the first element in the dataset

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 2, 4, 1, 3, 5, 9], 3).first()
1

Works also with empty partitions:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2], 20).first()
1
flatMap(f, preservesPartitioning=True)[source]

map followed by flatten

Parameters:
  • f – The map function.
  • preservesPartitioning – (optional) Preserve the partitioning of the original RDD. Default True.
Return type:

RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize(['hello', 'world']).flatMap(
...     lambda x: [ord(ch) for ch in x]
... ).collect()
[104, 101, 108, 108, 111, 119, 111, 114, 108, 100]
flatMapValues(f)[source]

map operation on values in a (key, value) pair followed by a flatten

Parameters:f – The map function.
Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize([(1, 'hi'), (2, 'world')]).flatMapValues(
...     lambda x: [ord(ch) for ch in x]
... ).collect()
[(1, 104), (1, 105), (2, 119), (2, 111), (2, 114), (2, 108), (2, 100)]
fold(zeroValue, op)[source]

fold

Parameters:
  • zeroValue – The inital value, for example 0 or 0.0.
  • op – The reduce operation.
Returns:

The folded (or aggregated) value.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 7, 2])
>>> my_rdd.fold(0, lambda a, b: a+b)
13
foldByKey(zeroValue, op)[source]

Fold (or aggregate) value by key.

Parameters:
  • zeroValue – The inital value, for example 0 or 0.0.
  • op – The reduce operation.
Return type:

RDD

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([('a', 4), ('b', 7), ('a', 2)])
>>> my_rdd.foldByKey(0, lambda a, b: a+b).collectAsMap()['a']
6
foreach(f)[source]

applies f to every element

It does not return a new RDD like RDD.map().

Parameters:f – Apply a function to every element.
Return type:None

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([1, 2, 3])
>>> a = []
>>> my_rdd.foreach(lambda x: a.append(x))
>>> len(a)
3
foreachPartition(f)[source]

applies f to every partition

It does not return a new RDD like RDD.mapPartitions().

Parameters:f – Apply a function to every partition.
Return type:None
fullOuterJoin(other, numPartitions=None)[source]

returns the full outer join of two RDDs

The output contains all keys from both input RDDs, with missing keys replaced with None.

Parameters:
  • other (RDD) – The RDD to join to this one.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> sc = Context()
>>> rdd1 = sc.parallelize([('a', 0), ('b', 1)])
>>> rdd2 = sc.parallelize([('b', 2), ('c', 3)])
>>> sorted(
...     rdd1.fullOuterJoin(rdd2).collect()
... )
[('a', (0, None)), ('b', (1, 2)), ('c', (None, 3))]
getNumPartitions()[source]

returns the number of partitions

Return type:int
getPartitions()[source]

returns the partitions of this RDD

groupBy(f, numPartitions=None)[source]

group by f

Parameters:
  • f – Function returning a key given an element of the dataset.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 7, 2])
>>> my_rdd.groupBy(lambda x: x % 2).mapValues(sorted).collect()
[(0, [2, 4]), (1, [7])]
groupByKey(numPartitions=None)[source]

group by key

Parameters:numPartitions (int) – Number of partitions in the resulting RDD.
Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

histogram(buckets)[source]

histogram

Parameters:buckets – A list of bucket boundaries or an int for the number of buckets.
Returns:A tuple (bucket_boundaries, histogram_values) where bucket_boundaries is a list of length n+1 boundaries and histogram_values is a list of length n with the values of each bucket.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([0, 4, 7, 4, 10])
>>> b, h = my_rdd.histogram(10)
>>> h
[1, 0, 0, 0, 2, 0, 0, 1, 0, 0, 1]
intersection(other)[source]

intersection of this and other RDD

Parameters:other (RDD) – The other dataset to do the intersection with.
Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> rdd1 = Context().parallelize([0, 4, 7, 4, 10])
>>> rdd2 = Context().parallelize([3, 4, 7, 4, 5])
>>> rdd1.intersection(rdd2).collect()
[4, 7]
join(other, numPartitions=None)[source]

join

Parameters:
  • other (RDD) – The other RDD.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> rdd1 = Context().parallelize([(0, 1), (1, 1)])
>>> rdd2 = Context().parallelize([(2, 1), (1, 3)])
>>> rdd1.join(rdd2).collect()
[(1, (1, 3))]
keyBy(f)[source]

key by f

Parameters:f – Function that returns a key from a dataset element.
Return type:RDD

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([0, 4, 7, 4, 10])
>>> rdd.keyBy(lambda x: x % 2).collect()
[(0, 0), (0, 4), (1, 7), (0, 4), (0, 10)]
keys()[source]

keys of a pair dataset

Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize([(0, 1), (1, 1)]).keys().collect()
[0, 1]
leftOuterJoin(other, numPartitions=None)[source]

left outer join

Parameters:
  • other (RDD) – The other RDD.
  • numPartitions (int) – Number of partitions in the resulting RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> rdd1 = Context().parallelize([(0, 1), (1, 1)])
>>> rdd2 = Context().parallelize([(2, 1), (1, 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[(0, (1, None)), (1, (1, 3))]
lookup(key)[source]

Return all the (key, value) pairs where the given key matches.

Parameters:key – The key to lookup.
Return type:list

Example:

>>> from pysparkling import Context
>>> Context().parallelize([(0, 1), (1, 1), (1, 3)]).lookup(1)
[1, 3]
map(f)[source]

map

Parameters:f – map function for elements
Return type:RDD

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3]).map(lambda x: x+1).collect()
[2, 3, 4]
mapPartitions(f, preservesPartitioning=False)[source]

map partitions

Parameters:f – map function for partitions
Return type:RDD

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([1, 2, 3, 4], 2)
>>> def f(iterator):
...     yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]
mapPartitionsWithIndex(f, preservesPartitioning=False)[source]

map partitions with index

Parameters:f – map function for (index, partition)
Return type:RDD

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([9, 8, 7, 6, 5, 4], 3)
>>> def f(splitIndex, iterator):
...     yield splitIndex
>>> rdd.mapPartitionsWithIndex(f).sum()
3
mapValues(f)[source]

map values in a pair dataset

Parameters:f – map function for values
Return type:RDD
max()[source]

returns the maximum element

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3, 4, 3, 2], 2).max() == 4
True
mean()[source]

returns the mean of this dataset

Example:

>>> from pysparkling import Context
>>> Context().parallelize([0, 4, 7, 4, 10]).mean()
5.0
min()[source]

returns the minimum element

name()[source]

returns the name of the dataset

partitionBy(numPartitions, partitionFunc=None)[source]

Return a partitioned copy of this key-value RDD.

Parameters:
  • numPartitions (int) – Number of partitions.
  • partitionFunc (function) – Partition function.
Return type:

RDD

Example where even numbers get assigned to partition 0 and odd numbers to partition 1:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1)
>>> keyvalue_rdd = rdd.map(lambda x: (x, x))
>>> keyvalue_rdd.partitionBy(2).keys().collect()
[2, 8, 1, 3, 7, 5]
persist(storageLevel=None)[source]

Cache the results of computed partitions.

Parameters:storageLevel – Not used.
pipe(command, env=None)[source]

Run a command with the elements in the dataset as argument.

Parameters:
  • command – Command line command to run.
  • env (dict) – environment variables
Return type:

RDD

Warning

Unsafe for untrusted data.

Example:

>>> from pysparkling import Context
>>> piped = Context().parallelize(['0', 'hello', 'world']).pipe('echo')
>>> b'hello\n' in piped.collect()
True
randomSplit(weights, seed=None)[source]

Split the RDD into a few RDDs according to the given weights.

Parameters:
  • weights (list[float]) – relative lengths of the resulting RDDs
  • seed (int) – seed for random number generator
Returns:

a list of RDDs

Return type:

list

Note

Creating the new RDDs is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize(range(500))
>>> rdd1, rdd2 = rdd.randomSplit([2, 3], seed=42)
>>> (rdd1.count(), rdd2.count())
(199, 301)
reduce(f)[source]

reduce

Parameters:f – A commutative and associative binary operator.

Example:

>>> from pysparkling import Context
>>> Context().parallelize([0, 4, 7, 4, 10]).reduce(lambda a, b: a+b)
25
reduceByKey(f)[source]

reduce by key

Parameters:f – A commutative and associative binary operator.
Return type:RDD

Note

This operation includes a pysparkling.RDD.groupByKey() which is a local operation.

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([(0, 1), (1, 1), (1, 3)])
>>> rdd.reduceByKey(lambda a, b: a+b).collect()
[(0, 1), (1, 4)]
repartition(numPartitions)[source]

repartition

Parameters:numPartitions (int) – Number of partitions in new RDD.
Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

repartitionAndSortWithinPartitions(numPartitions=None, partitionFunc=None, ascending=True, keyfunc=None)[source]

Repartition and sort within each partition.

Parameters:
  • numPartitions (int) – Number of partitions in new RDD.
  • partitionFunc – function that partitions
  • ascending – Default is True.
  • keyfunc – Returns the value that will be sorted.
Return type:

RDD

Example where even numbers are assigned to partition 0 and odd numbers to partition 1 and then the partitions are sorted individually:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> rdd = sc.parallelize([1, 3, 2, 7, 8, 5], 1)
>>> kv_rdd = rdd.map(lambda x: (x, x))
>>> processed = kv_rdd.repartitionAndSortWithinPartitions(2)
>>> processed.keys().collect()
[2, 8, 1, 3, 5, 7]
rightOuterJoin(other, numPartitions=None)[source]

right outer join

Parameters:
  • other (RDD) – The other RDD.
  • numPartitions (int) – Number of partitions in new RDD.
Return type:

RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> rdd1 = sc.parallelize([(0, 1), (1, 1)])
>>> rdd2 = sc.parallelize([(2, 1), (1, 3)])
>>> sorted(rdd1.rightOuterJoin(rdd2).collect())
[(1, (1, 3)), (2, (None, 1))]
sample(withReplacement, fraction, seed=None)[source]

randomly sample

Parameters:
  • withReplacement (bool) – sample with replacement
  • fraction (float) – probability that an element is sampled
  • seed – (optional) Seed for random number generator
Return type:

RDD

Sampling without replacement uses Bernoulli sampling and fraction is the probability that an element is sampled. Sampling with replacement uses Poisson sampling where fraction is the expectation.

Example:

>>> from pysparkling import Context
>>> rdd = Context().parallelize(range(1000))
>>> sampled = rdd.sample(False, 0.1, seed=5).collect()
>>> len(sampled)
115
>>> sampled_with_replacement = rdd.sample(True, 5.0, seed=5).collect()
>>> len(sampled_with_replacement) in (5067, 5111)  # w/o, w/ numpy
True
sampleByKey(withReplacement, fractions, seed=None)[source]

randomly sample by key

Parameters:
  • withReplacement (bool) – sample with replacement
  • fractions (dict) – per key sample probabilities
  • seed – (optional) Seed for random number generator.
Return type:

RDD

Sampling without replacement uses Bernoulli sampling and fraction is the probability that an element is sampled. Sampling with replacement uses Poisson sampling where fraction is the expectation.

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> fractions = {"a": 0.2, "b": 0.1}
>>> rdd = (sc
...        .parallelize(fractions.keys())
...        .cartesian(sc.parallelize(range(0, 1000))))
>>> sample = (rdd
...           .sampleByKey(False, fractions, 2)
...           .groupByKey().collectAsMap())
>>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
True
>>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
True
>>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
True
sampleStdev()[source]

sample standard deviation

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3]).sampleStdev()
1.0
sampleVariance()[source]

sample variance

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1, 2, 3]).sampleVariance()
1.0
saveAsPickleFile(path, batchSize=10)[source]

save as pickle file

Returns:self
Return type:RDD

Warning

The output of this function is incompatible with the PySpark output as there is no pure Python way to write Sequence files.

Example:

>>> from pysparkling import Context
>>> from tempfile import NamedTemporaryFile
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> d = ['hello', 'world', 1, 2]
>>> rdd = Context().parallelize(d).saveAsPickleFile(tmpFile.name)
>>> 'hello' in Context().pickleFile(tmpFile.name).collect()
True
saveAsTextFile(path, compressionCodecClass=None)[source]

save as text file

If the RDD has many partitions, the contents will be stored directly in the given path. If the RDD has more partitions, the data of the partitions are stored in individual files under path/part-00000 and so on and once all partitions are written, the file path/_SUCCESS is written last.

Parameters:
  • path – Destination of the text file.
  • compressionCodecClass – Not used.
Returns:

self

Return type:

RDD

sortBy(keyfunc, ascending=True, numPartitions=None)[source]

sort by keyfunc

Parameters:
  • keyfunc – Returns the value that will be sorted.
  • ascending – Default is True.
  • numPartitions (int) – Default is None. None means the output will have the same number of partitions as the input.
Return type:

RDD

Note

Sorting is currently implemented as a local operation.

Examples:

>>> from pysparkling import Context
>>> rdd = Context().parallelize([5, 1, 2, 3])
>>> rdd.sortBy(lambda x: x).collect()
[1, 2, 3, 5]
>>> from pysparkling import Context
>>> rdd = Context().parallelize([1, 5, 2, 3])
>>> rdd.sortBy(lambda x: x, ascending=False).collect()
[5, 3, 2, 1]
sortByKey(ascending=True, numPartitions=None, keyfunc=<operator.itemgetter object>)[source]

sort by key

Parameters:
  • ascending – Default is True.
  • numPartitions (int) – Default is None. None means the output will have the same number of partitions as the input.
  • keyfunc – Returns the value that will be sorted.
Return type:

RDD

Note

Sorting is currently implemented as a local operation.

Examples:

>>> from pysparkling import Context
>>> rdd = Context().parallelize(
...     [(5, 'a'), (1, 'b'), (2, 'c'), (3, 'd')]
... )
>>> rdd.sortByKey().collect()[0][1] == 'b'
True
>>> from pysparkling import Context
>>> rdd = Context().parallelize(
...     [(1, 'b'), (5, 'a'), (2, 'c'), (3, 'd')]
... )
>>> rdd.sortByKey(ascending=False).collect()[0][1] == 'a'
True
stats()[source]

stats

Return type:StatCounter

Example:

>>> from pysparkling import Context
>>> d = [1, 4, 9, 16, 25, 36]
>>> s = Context().parallelize(d, 3).stats()
>>> sum(d)/len(d) == s.mean()
True
stdev()[source]

standard deviation

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1.5, 2.5]).stdev()
0.5
subtract(other, numPartitions=None)[source]

subtract

Parameters:
  • other (RDD) – The RDD to subtract from the current RDD.
  • numPartitions (int) – Currently not used. Partitions are preserved.
Return type:

RDD

Example:

>>> from pysparkling import Context
>>> rdd1 = Context().parallelize([(0, 1), (1, 1)])
>>> rdd2 = Context().parallelize([(1, 1), (1, 3)])
>>> rdd1.subtract(rdd2).collect()
[(0, 1)]
sum()[source]

sum of all the elements

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([0, 4, 7, 4, 10]).sum()
25
take(n)[source]

Take n elements and return them in a list.

Only evaluates the partitions that are necessary to return n elements.

Parameters:n (int) – Number of elements to return.
Return type:list

Example:

>>> from pysparkling import Context
>>> Context().parallelize([4, 7, 2]).take(2)
[4, 7]

Another example where only the first two partitions only are computed (check the debug logs):

>>> from pysparkling import Context
>>> Context().parallelize([4, 7, 2], 3).take(2)
[4, 7]
takeSample(n)[source]

take sample

Assumes samples are evenly distributed between partitions. Only evaluates the partitions that are necessary to return n elements.

Parameters:n (int) – The number of elements to sample.
Return type:list

Example:

>>> from pysparkling import Context
>>> Context().parallelize([4, 7, 2]).takeSample(1)[0] in [4, 7, 2]
True

Another example where only one partition is computed (check the debug logs):

>>> from pysparkling import Context
>>> d = [4, 9, 7, 3, 2, 5]
>>> Context().parallelize(d, 3).takeSample(1)[0] in d
True
toLocalIterator()[source]

Returns an iterator over the dataset.

Example:

>>> from pysparkling import Context
>>> sum(Context().parallelize([4, 9, 7, 3, 2, 5], 3).toLocalIterator())
30
top(num, key=None)[source]

Top N elements in descending order.

Parameters:
  • num (int) – number of elements
  • key – optional key function
Return type:

list

Example:

>>> from pysparkling import Context
>>> r = Context().parallelize([4, 9, 7, 3, 2, 5], 3)
>>> r.top(2)
[9, 7]
union(other)[source]

union

Parameters:other (RDD) – The other RDD for the union.
Return type:RDD

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3)
>>> my_rdd.union(my_rdd).count()
12
values()[source]

Values of a (key, value) dataset.

Return type:RDD
variance()[source]

The variance of the dataset.

Return type:float

Example:

>>> from pysparkling import Context
>>> Context().parallelize([1.5, 2.5]).variance()
0.25
zip(other)[source]

zip

Parameters:other (RDD) – Other dataset to zip with.
Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3)
>>> my_rdd.zip(my_rdd).collect()
[(4, 4), (9, 9), (7, 7), (3, 3), (2, 2), (5, 5)]
zipWithIndex()[source]

Returns pairs of an original element and its index.

Return type:RDD

Note

Creating the new RDD is currently implemented as a local operation.

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([4, 9, 7, 3, 2, 5], 3)
>>> my_rdd.zipWithIndex().collect()
[(4, 0), (9, 1), (7, 2), (3, 3), (2, 4), (5, 5)]
zipWithUniqueId()[source]

Zip every entry with a unique index.

This is a fast operation.

Return type:RDD

Example:

>>> from pysparkling import Context
>>> my_rdd = Context().parallelize([423, 234, 986, 5, 345], 3)
>>> my_rdd.zipWithUniqueId().collect()
[(423, 0), (234, 1), (986, 4), (5, 2), (345, 5)]
class pysparkling.StatCounter(values=None)[source]

Context

A Context describes the setup. Instantiating a Context with the default arguments using Context() is the most lightweight setup. All data is just in the local thread and is never serialized or deserialized.

If you want to process the data in parallel, you can use the multiprocessing module. Given the limitations of the default pickle serializer, you can specify to serialize all methods with cloudpickle instead. For example, a common instantiation with multiprocessing looks like this:

c = Context(
    multiprocessing.Pool(4),
    serializer=cloudpickle.dumps,
    deserializer=pickle.loads,
)

This assumes that your data is serializable with pickle which is generally faster. You can also specify a custom serializer/deserializer for data.

class pysparkling.Context(pool=None, serializer=None, deserializer=None, data_serializer=None, data_deserializer=None, max_retries=3, retry_wait=0.0)[source]

Context object similar to a Spark Context.

The variable _stats contains measured timing information about data and function (de)serialization and workload execution to benchmark your jobs.

Parameters:
  • pool – An instance with a map(func, iterable) method.
  • serializer – Serializer for functions. Examples are pickle.dumps and dill.dumps.
  • deserializer – Deserializer for functions. Examples are pickle.loads and dill.loads.
  • data_serializer – Serializer for the data.
  • data_deserializer – Deserializer for the data.
  • max_retries (int) – maximum number a partition is retried
  • retry_wait (float) – seconds to wait between retries
parallelize(x, numPartitions=None)[source]

Parallelize x.

Parameters:
  • x – An iterable (e.g. a list) that represents the data.
  • numPartitions (int|None) – (optional) The number of partitions the data should be split into. A partition is a unit of data that is processed at a time. Can be None.
Return type:

RDD

pickleFile(name, minPartitions=None)[source]

Read a pickle file.

Reads files created with RDD.saveAsPickleFile() into an RDD.

Parameters:
  • name – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type:

RDD

Example with a serialized list:

>>> import pickle
>>> from pysparkling import Context
>>> from tempfile import NamedTemporaryFile
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> with open(tmpFile.name, 'wb') as f:
...     pickle.dump(['hello', 'world'], f)
>>> Context().pickleFile(tmpFile.name).collect()[0] == 'hello'
True
runJob(rdd, func, partitions=None, allowLocal=False, resultHandler=None)[source]

This function is used by methods in the RDD.

Note that the maps are only inside generators and the resultHandler needs to take care of executing the ones that it needs. In other words, if you need everything to be executed, the resultHandler needs to be at least lambda x: list(x) to trigger execution of the generators.

Parameters:
  • func – Map function. The signature is func(TaskContext, Iterator over elements).
  • partitions – (optional) List of partitions that are involved. Default is None, meaning the map job is applied to all partitions.
  • allowLocal – (optional) Allows for local execution. Default is False.
  • resultHandler – (optional) Process the result from the maps.
Returns:

Result of resultHandler.

binaryFiles(path, minPartitions=None)[source]

Read a binary file into an RDD.

Parameters:
  • path – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type:

RDD

Warning

Not part of PySpark API.

Setting up examples:

>>> import os, pysparkling
>>> from backports import tempfile
>>> sc = pysparkling.Context()
>>> decode = lambda bstring: bstring.decode()

Example with whole file:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(b'bellobello')
...     sc.binaryFiles(tmp+'*').mapValues(decode).collect()
[('...', 'bellobello')]
binaryRecords(path, recordLength=None)[source]

Read a binary file into an RDD.

Parameters:
  • path – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • recordLength – If None every file is a record, int means fixed length records and a string is used as a format string to struct to read the length of variable length binary records.
Return type:

RDD

Warning

Only an int recordLength is part of the PySpark API.

Setting up examples:

>>> import os, pysparkling
>>> from backports import tempfile
>>> sc = pysparkling.Context()
>>> decode = lambda bstring: bstring.decode()

Example with whole file:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(b'bellobello')
...     sc.binaryRecords(tmp+'*').map(decode).collect()
['bellobello']

Example with fixed length records:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(b'bellobello')
...     sc.binaryRecords(tmp+'*', recordLength=5).map(decode).collect()
['bello', 'bello']

Example with variable length records:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(struct.pack('<I', 5) + b'bello')
...         _ = f.write(struct.pack('<I', 10) + b'bellobello')
...     (sc.binaryRecords(tmp+'*', recordLength='<I')
...      .map(decode).collect())
['bello', 'bellobello']
textFile(filename, minPartitions=None, use_unicode=True)[source]

Read a text file into an RDD.

Parameters:
  • filename – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
  • use_unicode – (optional, default=True) Use utf8 if True and ascii if False.
Return type:

RDD

union(rdds)[source]

Create a union of rdds.

Parameters:rdds – Iterable of RDDs.
Return type:RDD
wholeTextFiles(path, minPartitions=None, use_unicode=True)[source]

Read text files into an RDD of pairs of file name and file content.

Parameters:
  • path – Location of the files. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
  • use_unicode – (optional, default=True) Use utf8 if True and ascii if False.
Return type:

RDD

Streaming

Warning

This is a new addition to the API (March 2017) that should only be used with care.

StreamingContext

class pysparkling.streaming.StreamingContext(sparkContext, batchDuration=None)[source]

Stream processing.

Parameters:
  • sparkContext (pysparkling.Context) – A pysparkling.Context.
  • batchDuration (float) – Duration in seconds per batch.
sparkContext

Return context of this StreamingContext.

awaitTermination(timeout=None)[source]

Wait for context to stop.

Parameters:timeout (float) – in seconds
awaitTerminationOrTimeout(timeout)[source]

Provided for compatibility. Same as awaitTermination() here.

binaryRecordsStream(directory, recordLength=None, process_all=False)[source]

Monitor a directory and process all binary files.

File names starting with . are ignored.

Parameters:
  • directory (string) – a path
  • recordLength – None, int or struct format string
  • process_all (bool) – whether to process pre-existing files
Return type:

DStream

Warning

Only int recordLength are supported in PySpark API. The process_all parameter does not exist in the PySpark API.

queueStream(rdds, oneAtATime=True, default=None)[source]

Create stream iterable over RDDs.

Parameters:
  • rdds – Iterable over RDDs or lists.
  • oneAtATime – Process one at a time or all.
  • default – If no more RDDs in rdds, return this. Can be None.
Return type:

DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[4], [2], [7]])
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[4]
[2]
[7]

Example testing the default value:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[4], [2]], default=['placeholder'])
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[4]
[2]
['placeholder']
remember(duration)[source]

Provided for compatibility, but does nothing here.

socketBinaryStream(hostname, port, length)[source]

Create a TCP socket server for binary input.

Warning

This is not part of the PySpark API.

Parameters:
  • hostname (string) – Hostname of TCP server.
  • port (int) – Port of TCP server.
  • length

    Message length. Length in bytes or a format string for struct.unpack().

    For variable length messages where the message length is sent right before the message itself, length is a format string that can be passed to struct.unpack(). For example, use length='<I' for a little-endian (standard on x86) 32-bit unsigned int.

Return type:

DStream

socketTextStream(hostname, port)[source]

Create a TCP socket server.

Parameters:
  • hostname (string) – Hostname of TCP server.
  • port (int) – Port of TCP server.
Return type:

DStream

start()[source]

Start processing streams.

stop(stopSparkContext=True, stopGraceFully=False)[source]

Stop processing streams.

Parameters:
  • stopSparkContext – stop the SparkContext (NOT IMPLEMENTED)
  • stopGracefully – stop gracefully (NOT IMPLEMENTED)
textFileStream(directory, process_all=False)[source]

Monitor a directory and process all text files.

File names starting with . are ignored.

Parameters:
  • directory (string) – a path
  • process_all (bool) – whether to process pre-existing files
Return type:

DStream

Warning

The process_all parameter does not exist in the PySpark API.

fileTextStream(directory, process_all=False)

Alias of textFileStream().

fileBinaryStream(directory, recordLength=None, process_all=False)

Alias of binaryRecordsStream().

DStream

class pysparkling.streaming.DStream(jdstream, ssc, jrdd_deserializer=None)[source]

A discrete stream of RDDs.

Usually a DStream is created by a pysparkling.streaming.StreamingContext method like pysparkling.streaming.StreamingContext.queueStream() and then operated on with the methods below.

Parameters:
  • jdstream – previous stream
  • ssc (StreamingContext) – the streaming context
  • jrdd_deserializer – a deserializer callable
cache()[source]

Cache RDDs.

Return type:DStream
cogroup(other, numPartitions=None)[source]

Apply cogroup to RDDs of this and other DStream.

Parameters:other (DStream) – another DStream
Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('b', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> s1.cogroup(s2).foreachRDD(lambda rdd: print(sorted(rdd.collect())))
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', [[4], [1]]), ('b', [[2], [3]])]
[('c', [[7], [8]])]
context()[source]

Return the StreamContext of this stream.

Return type:StreamingContext
count()[source]

Count elements per RDD.

Creates a new RDD stream where each RDD has a single entry that is the count of the elements.

Return type:DStream
countByValue()[source]

Apply countByValue to every RDD.abs

Return type:DStream

Warning

Implemented as a local operation.

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[1, 1, 5, 5, 5, 2]])
...     .countByValue()
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.15)
[(1, 2), (2, 1), (5, 3)]
countByWindow(windowDuration, slideDuration=None)[source]

Applies count() after window().

Parameters:
  • windowDuration (float) – multiple of batch interval
  • slideDuration (float) – multiple of batch interval
Return type:

DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[1, 1, 5], [5, 5, 2, 4], [1, 2]])
...     .countByWindow(0.2)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[3]
[7]
[6]
filter(f)[source]

Filter elements.

Parameters:f – filter function
Return type:DStream
flatMap(f, preservesPartitioning=False)[source]

Apply function f and flatten.

Parameters:f – mapping function
Return type:DStream
flatMapValues(f)[source]

Apply f to each value of a key-value pair.

Parameters:f – map function
Return type:DStream
foreachRDD(func)[source]

Apply func.

Parameters:func – Function to apply.
fullOuterJoin(other, numPartitions=None)[source]

Apply fullOuterJoin to each pair of RDDs.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('b', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> (
...     s1.fullOuterJoin(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (4, 1)), ('b', (2, 3))]
[('c', (7, 8))]

Example with repeated keys:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('a', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('b', 1)], [('c', 8)]])
>>> (
...     s1.fullOuterJoin(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (2, None)), ('a', (4, None)), ('b', (None, 1))]
[('c', (7, 8))]
groupByKey()[source]

group by key

Return type:DStream
join(other, numPartitions=None)[source]

Apply join to each pair of RDDs.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> (
...     s1.join(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (4, 1))]
[('c', (7, 8))]
leftOuterJoin(other, numPartitions=None)[source]

Apply leftOuterJoin to each pair of RDDs.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> (
...     s1.leftOuterJoin(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (4, 1)), ('e', (2, None))]
[('c', (7, 8))]
map(f, preservesPartitioning=False)[source]

Apply function f

Parameters:f – mapping function
Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[4], [2], [7]])
...     .map(lambda e: e + 1)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[5]
[3]
[8]
mapPartitions(f, preservesPartitioning=False)[source]

Map partitions.

Parameters:f – mapping function
Return type:DStream
mapPartitionsWithIndex(f, preservesPartitioning=False)[source]

Apply a map function that takes an index and the data.

Map partitions with a function that takes the partition index and an iterator over the partition data as arguments.

Parameters:f – mapping function
Return type:DStream
mapValues(f)[source]

Apply f to every element.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([[('a', 4)], [('b', 2)], [('c', 7)]])
...     .mapValues(lambda e: e + 1)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[('a', 5)]
[('b', 3)]
[('c', 8)]
pprint(num=10)[source]

Print the first num elements of each RDD.

Parameters:num (int) – Set number of elements to be printed.
reduce(func)[source]

Return a new DStream where each RDD was reduced with func.

Return type:DStream
reduceByKey(func, numPartitions=None)[source]

Apply reduceByKey to every RDD.

Parameters:
  • func – reduce function to apply
  • numPartitions (int) – number of partitions
Return type:

DStream

repartition(numPartitions)[source]

Repartition every RDD.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> (
...     ssc
...     .queueStream([['hello', 'world']])
...     .repartition(2)
...     .foreachRDD(lambda rdd: print(len(rdd.partitions())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
2
0
rightOuterJoin(other, numPartitions=None)[source]

Apply rightOuterJoin to each pair of RDDs.

Return type:DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> s1 = ssc.queueStream([[('a', 4), ('e', 2)], [('c', 7)]])
>>> s2 = ssc.queueStream([[('a', 1), ('b', 3)], [('c', 8)]])
>>> (
...     s1.rightOuterJoin(s2)
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.25)
[('a', (4, 1)), ('b', (None, 3))]
[('c', (7, 8))]
saveAsTextFiles(prefix, suffix=None)[source]

Save every RDD as a text file (or sets of text files).

Parameters:
  • prefix (string) – path prefix of the output
  • suffix (string) – file suffix (e.g. ‘.gz’ to enable compression)

Example:

>>> from backports import tempfile
>>> import os, pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> with tempfile.TemporaryDirectory() as tmp_dir:
...     (
...         ssc.queueStream([['hello', 'world'], [1, 2]])
...         .saveAsTextFiles(os.path.join(tmp_dir, 'textout'))
...     )
...     ssc.start()
...     ssc.awaitTermination(0.25)
...     result = sc.textFile(tmp_dir + '*').collect()
>>> result
['hello', 'world', '1', '2']
slice(begin, end)[source]

Filter RDDs to between begin and end.

Parameters:
  • begin (datetime.datetime|int) – datetiem or unix timestamp
  • end (datetime.datetime|int) – datetiem or unix timestamp
Return type:

DStream

transform(func)[source]

Return a new DStream where each RDD is transformed by f.

Parameters:f – Function that transforms an RDD.
Return type:DStream
transformWith(func, other, keepSerializer=False)[source]

Return a new DStream where each RDD is transformed by f.

Parameters:f – transformation function
Return type:DStream

The transformation function can have arguments (time, rdd_a, rdd_b) or (rdd_a, rdd_b).

union(other)[source]

Union of two DStreams.

Parameters:other (DStream) – Another DStream.

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.1)
>>> odd = ssc.queueStream([[1], [3], [5]])
>>> even = ssc.queueStream([[2], [4], [6]])
>>> (
...     odd.union(even)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.35)
[1, 2]
[3, 4]
[5, 6]
updateStateByKey(func)[source]

Process with state.

Parameters:func – Evaluated per key. Takes list of input_values and a state.
Return type:DStream

This example shows how to return the latest value per key:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2)
>>> (
...     ssc
...     .queueStream([[('a', 1), ('b', 3)], [('a', 2), ('c', 4)]])
...     .updateStateByKey(lambda input_values, state:
...                       state
...                       if not input_values
...                       else input_values[-1])
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.5)
[('a', 1), ('b', 3)]
[('a', 2), ('b', 3), ('c', 4)]

This example counts values per key:

>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2)
>>> (
...     ssc
...     .queueStream([[('a', 1)], [('a', 2), ('b', 4), ('b', 3)]])
...     .updateStateByKey(lambda input_values, state:
...                       (state if state is not None else 0) +
...                       sum(input_values))
...     .foreachRDD(lambda rdd: print(sorted(rdd.collect())))
... )
>>> ssc.start()
>>> ssc.awaitTermination(0.5)
[('a', 1)]
[('a', 3), ('b', 7)]
window(windowDuration, slideDuration=None)[source]

Windowed RDD.

Parameters:
  • windowDuration (float) – multiple of batching interval
  • slideDuration (float) – multiple of batching interval
Return type:

DStream

Example:

>>> import pysparkling
>>> sc = pysparkling.Context()
>>> ssc = pysparkling.streaming.StreamingContext(sc, 0.2)
>>> (
...     ssc
...     .queueStream([[1], [2], [3], [4], [5], [6]])
...     .window(0.6)
...     .foreachRDD(lambda rdd: print(rdd.collect()))
... )
>>> ssc.start()
>>> ssc.awaitTermination(1.3)
[1]
[1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]

fileio

The functionality provided by this module is used in Context.textFile() for reading and in RDD.saveAsTextFile() for writing.

You can use this submodule with File.dump(), File.load() and File.exists() to read, write and check for existance of a file. All methods transparently handle various schemas (for example http://, s3:// and file://) and compression/decompression of .gz and .bz2 files (among others).

class pysparkling.fileio.File(file_name)[source]

File object.

Parameters:file_name – Any file name.
static resolve_filenames(all_expr)[source]

resolve expression for a filename

Parameters:all_expr – A comma separated list of expressions. The expressions can contain the wildcard characters * and ?. It also resolves Spark datasets to the paths of the individual partitions (i.e. my_data gets resolved to [my_data/part-00000, my_data/part-00001]).
Returns:A list of file names.
Return type:list
exists()[source]

Checks both for a file or directory at this location.

Returns:True or false.
load()[source]

Load the data from a file.

Return type:io.BytesIO
dump(stream=None)[source]

Writes a stream to a file.

Parameters:stream – A BytesIO instance. bytes are also possible and are converted to BytesIO.
Return type:File
make_public(recursive=False)[source]

Makes the file public. Currently only supported on S3.

Parameters:recursive – Whether to apply this recursively.
Return type:File
class pysparkling.fileio.TextFile(file_name)[source]

Derived from File.

Parameters:file_name – Any text file name.
load(encoding=u'utf8', encoding_errors=u'ignore')[source]

Load the data from a file.

Parameters:
  • encoding (str) – The character encoding of the file.
  • encoding_errors (str) – How to handle encoding errors.
Return type:

io.StringIO

dump(stream=None, encoding=u'utf8', encoding_errors=u'ignore')[source]

Writes a stream to a file.

Parameters:
  • stream – An io.StringIO instance. A basestring is also possible and get converted to io.StringIO.
  • encoding – (optional) The character encoding of the file.
Return type:

TextFile

File System

class pysparkling.fileio.fs.FileSystem(file_name)[source]

Interface class for the file system.

Parameters:file_name (str) – File name.
static resolve_filenames(expr)[source]

Resolve the given glob-like expression to filenames.

Return type:list
exists()[source]

Check whether the given file_name exists.

Return type:bool
load()[source]

Load a file to a stream.

Return type:io.BytesIO
load_text(encoding='utf8', encoding_errors='ignore')[source]

Load a file to a stream.

Parameters:
  • encoding (str) – Text encoding.
  • encoding_errors (str) – How to handle encoding errors.
Return type:

io.StringIO

dump(stream)[source]

Dump a stream to a file.

Parameters:stream (io.BytesIO) – Input tream.
make_public(recursive=False)[source]

Make the file public (only on some file systems).

Parameters:recursive (bool) – Recurse.
Return type:FileSystem
class pysparkling.fileio.fs.Local(file_name)[source]

FileSystem implementation for the local file system.

class pysparkling.fileio.fs.GS(file_name)[source]

FileSystem implementation for Google Storage.

Paths are of the form gs://bucket_name/file_path or gs://project_name:bucket_name/file_path.

mime_type = 'text/plain'

Default mime type.

project_name = None

Set a default project name.

class pysparkling.fileio.fs.Hdfs(file_name)[source]

FileSystem implementation for HDFS.

class pysparkling.fileio.fs.Http(file_name)[source]

FileSystem implementation for HTTP.

class pysparkling.fileio.fs.S3(file_name)[source]

FileSystem implementation for S3.

Use environment variables AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID for auth and use file paths of the form s3://bucket_name/filename.txt.

connection_kwargs = {}

Keyword arguments for new connections. Example: set to {'anon': True} for anonymous connections.

Codec

class pysparkling.fileio.codec.Codec[source]

Codec.

compress(stream)[source]

Compress.

Parameters:stream (io.BytesIO) – Uncompressed input stream.
Return type:io.BytesIO
decompress(stream)[source]

Decompress.

Parameters:stream (io.BytesIO) – Compressed input stream.
Return type:io.BytesIO
class pysparkling.fileio.codec.Bz2[source]

Implementation of Codec for bz2 compression.

class pysparkling.fileio.codec.Gz[source]

Implementation of Codec for gz compression.

class pysparkling.fileio.codec.Lzma[source]

Implementation of Codec for lzma compression.

Needs Python >= 3.3.

class pysparkling.fileio.codec.SevenZ[source]

Implementation of Codec for 7z compression.

Needs the pylzma module.

class pysparkling.fileio.codec.Tar[source]

Implementation of Codec for tar compression.

class pysparkling.fileio.codec.TarGz[source]

Implementation of Codec for .tar.gz compression.

class pysparkling.fileio.codec.TarBz2[source]

Implementation of Codec for .tar.bz2 compression.

class pysparkling.fileio.codec.Zip[source]

Implementation of Codec for zip compression.

Parallelization

Pysparkling supports parallelizations on the local machine and across clusters of computers.

Processes and Threads

Single machine parallelization with concurrent.futures.ThreadPoolExecutor, concurrent.futures.ProcessPoolExecutor or multiprocessing.Pool is supported. Use cloudpickle instead of pickle for serialization to support lambda functions (and more) for data transformations.

import cloudpickle
import concurrent
import pysparkling

sc = pysparkling.Context(
    pool=concurrent.futures.ProcessPoolExecutor(4),
    serializer=cloudpickle.dumps,
    deserializer=pickle.loads,
)

Experimental

The following are experimental notes. Most of them don’t even contain examples how to make use of these techniques with pysparkling.

ipcluster and IPython.parallel

Local test setup:

ipcluster start --n=2
from IPython.parallel import Client

c = Client()
print(c[:].map(lambda _: 'hello world', range(2)).get())

which should print ['hello world', 'hello world'].

To run on a cluster, create a profile:

ipython profile create --parallel --profile=smallcluster

# start controller:
# Creates ~/.ipython/profile_smallcluster/security/ipcontroller-engine.json
# which is used by the engines to identify the location of this controller.
# This is the local-only IP address. Substitute with the machines IP
# address so that the engines can find it.
ipcontroller --ip=127.0.0.1 --port=7123 --profile=smallcluster

# start engines (assuming they have access to the
# ipcontroller-engine.json file)
ipengine --profile=smallcluster

Test it in Python:

from IPython.parallel import Client

c = Client(profile='smallcluster')
print(c[:].map(lambda _: 'hello world', range(2)).get())

If you don’t want to start the engines manually, ipcluster comes with “Launchers” that can start them for you: https://ipython.org/ipython-doc/dev/parallel/parallel_process.html#using-ipcluster-in-ssh-mode

StarCluster

Setting up StarCluster was an experiment. However it does not integrate well with the rest of our EC2 infrastructure, so we switched to a Chef based setup where we use ipcluster directly. A blocker was that the number of engines per node is not configurable and we have many map jobs that wait on external responses.

Setup

# install
pip install starcluster

# create configuration
starcluster help  # choose the option to create a sample config file

# add your user id, aws_access_key_id and aws_secret_access_key to config

# create an ssh key (this creates a new key just for starcluster)
# and registers it with AWS
starcluster createkey starclusterkey -o ~/.ssh/starclusterkey.rsa

# add this key to config:
[key starclusterkey]
KEY_LOCATION=~/.ssh/starclusterkey.rsa
# and use this key in the cluster setup:
KEYNAME = starclusterkey

# disable the queue, Sun Grid Engine
# (unnecessary for pysparkling and takes time during setup)
DISABLE_QUEUE=True

# to enable IPython parallel support, uncomment these lines in config:
[plugin ipcluster]
SETUP_CLASS = starcluster.plugins.ipcluster.IPCluster

# and make sure you have this line inside the cluster section
[cluster smallcluster]
PLUGINS = ipcluster

# start the cluster
starcluster start smallcluster

# check it has started
starcluster listclusters

Currently use: ami-da180db2 (Ubuntu 14.04 with 100GB EBS) on m3.medium instances.

Workarounds:

# this seems to be a dependency that does not get installed
pip install pexpect

# to validate the ssh host, you need to log in once manually, to add it
# to the list of known hosts
starcluster sshmaster smallcluster

In Python, you should now be able to run

from IPython.parallel import Client

# the exact command is printed after the cluster started
rc = Client('/Users/sven/.starcluster/ipcluster/SecurityGroup:@sc-smallcluster-us-east-1.json',
            sshkey='/Users/sven/.ssh/starclusterkey.rsa', packer='pickle')

view = rc[:]
results = view.map(lambda x: x**30, range(8))
print results.get()

which is also in tests/starcluster_simple.py.

Install your own software that is not on pypi:

pip install wheel
python setup.py bdist_wheel  # add --universal for Python2 and 3 packages
starcluster put smallcluster dist/your_package_name.whl /home/sgeadmin/your_package_name.whl

# ssh into remote machine
starcluster sshmaster smallcluster
> pip install --upgrade pip
> pip install wheel
> pip2.7 install /home/sgeadmin/your_package_name.whl

Development

Fork the Github repository and apply your changes in a feature branch. To run pysparkling’s unit tests:

# install
pip install -e .[hdfs,performance,streaming,tests]
flake8 --install-hook

# run linting and test
flake8
PERFORMANCE=1 nosetests -vv

Don’t run python setup.py test as this will not execute the doctests. When all tests pass, create a Pull Request on GitHub. Please also update HISTORY.rst with short description of your change.

To preview the docs locally, install the extra dependencies with pip install -r docs/requirements.txt, and then cd into docs/sphinx, run make html and open docs/sphincs/_build/html/index.html.

Please also try not to add derivative work from other projects. If you do, incorporate proper handling of external licenses in your Pull Request.