Welcome to Optimus’s documentation!

_images/logoOptimus.png

As data scientists, we care about extracting the best information out of our data. Data is the new soil, you have to get in and get your hands dirty, without cleaning and preparing it, it just useless.

Data preparation accounts for about 80% of the work of data scientists, so having a solution that connects to your database or file system, uses the most important framework for machine learning and data science at the moment (Apache Spark) and that can handle lots of information, working both in a cluster in a parallelized fashion or locally on your laptop is really important to have.

Say Hi! to Optimus and visit our web page.

Prepare, process and explore your Big Data with fastest open source library on the planet using Apache Spark and Python (PySpark).

Overview

_images/logoOptimus.png

Description

Optimus (By Iron )is the missing framework for cleaning, pre-processing and exploring data in a distributed fashion. It uses all the power of Apache Spark (optimized via Catalyst) to do so. It implements several handy tools for data wrangling and munging that will make your life much easier. The first obvious advantage over any other public data cleaning library or framework is that it will work on your laptop or your big cluster, and second, it is amazingly easy to install, use and understand.

Installation

In your terminal just type:

pip install optimuspyspark

Requirements

  • Apache Spark >= 2.3.1 (installed with the package)
  • Python >= 3.6

Column Operations

Here you will see a detailed overview of all the column operations available in Optimus. You can access the operation via df.cols

Let’s create a sample dataframe to start working.

# Import Optimus
from optimus import Optimus
# Create Optimus instance
op = Optimus()

from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType, ArrayType

df = op.create.df(
            [
                ("words", "str", True),
                ("num", "int", True),
                ("animals", "str", True),
                ("thing", StringType(), True),
                ("two strings", StringType(), True),
                ("filter", StringType(), True),
                ("num 2", "string", True),
                ("col_array",  ArrayType(StringType()), True),
                ("col_int",  ArrayType(IntegerType()), True)

            ]
,
[
                ("  I like     fish  ", 1, "dog", "housé", "cat-car", "a","1",["baby", "sorry"],[1,2,3]),
                ("    zombies", 2, "cat", "tv", "dog-tv", "b","2",["baby 1", "sorry 1"],[3,4]),
                ("simpsons   cat lady", 2, "frog", "table","eagle-tv-plus","1","3", ["baby 2", "sorry 2"], [5,6,7]),
                (None, 3, "eagle", "glass", "lion-pc", "c","4", ["baby 3", "sorry 3"] ,[7,8])
            ])

To see the dataframe we will use the table() function, a much better way to see your results, instead of the built-in `show() function.

df.table()
words num animals thing two strings filter num 2 col_array col_int
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3]
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4]
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7]
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8]

cols.append(col_name=None, value=None)

Appends a column to a Dataframe

df = df.cols.append("new_col_1", 1)
df.table()
words num animals thing two strings filter num 2 col_array col_int new_col_1
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1
from pyspark.sql.functions import *

df.cols.append([
    ("new_col_2", 2.22),
    ("new_col_3", lit(3))
    ]).table()
words num animals thing two strings filter num 2 col_array col_int new_col_1 new_col_2 new_col_3
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1 2.22 3
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1 2.22 3
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1 2.22 3
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1 2.22 3
df.cols.append([
("new_col_4", "test"),
("new_col_5", df['num']*2),
("new_col_6", [1,2,3])
]).table()
words num animals thing two strings filter num 2 col_array col_int new_col_1 new_col_4 new_col_5 new_col_6
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1 test 2 [1, 2, 3]
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1 test 4 [1, 2, 3]
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1 test 4 [1, 2, 3]
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1 test 6 [1, 2, 3]

cols.select(columns=None, regex=None, data_type=None)

Select columns using index, column name, regex or data type

columns = ["words", 1, "animals", 3]
df.cols.select(columns).table()
words num animals thing
I like fish 1 dog housé
zombies 2 cat tv
simpsons cat lady 2 frog table
null 3 eagle glass
df.cols.select("n.*", regex = True).show()
num num 2 new_col_1
1 1 1
2 2 1
2 3 1
3 4 1
df.cols.select("*", data_type = "str").table()
thing words animals filter two strings num 2
housé I like fish dog a cat-car 1
tv zombies cat b dog-tv 2
table simpsons cat lady frog 1 eagle-tv-plus 3
glass null eagle c lion-pc 4

cols.rename(columns_old_new=None, func=None)

Changes the name of a column(s) dataFrame.

df.cols.rename('num','number').table()
words number animals thing two strings filter num 2 col_array col_int new_col_1
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1
df.cols.rename([('num','number'),("animals","gods")], str.upper).table()
WORDS NUM ANIMALS THING TWO STRINGS FILTER NUM 2 COL_ARRAY COL_INT NEW_COL_1
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1
df.cols.rename(str.lower).table()
words num animals thing two strings filter num 2 col_array col_int new_col_1
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1

cols.cast()

Cast multiple columns to a specific datatype.

List of tuples of column names and types to be casted. This variable should have the following structure:

colsAndTypes = [(‘columnName1’, ‘integer’), (‘columnName2’, ‘float’), (‘columnName3’, ‘string’)]

The first parameter in each tuple is the column name, the second is the final datatype of column after the transformation is made.

df.cols.cast([("num", "string"),("num 2", "integer")]).dtypes

 [('words', 'string'),
 ('num', 'string'),
 ('animals', 'string'),
 ('thing', 'string'),
 ('two strings', 'string'),
 ('filter', 'string'),
 ('num 2', 'int'),
 ('col_array', 'array<string>'),
 ('col_int', 'array<int>'),
 ('new_col_1', 'int')]

You can cast all columns to a specific type too.

df.cols.cast("*", "string").dtypes

[('words', 'string'),
 ('num', 'string'),
 ('animals', 'string'),
 ('thing', 'string'),
 ('two strings', 'string'),
 ('filter', 'string'),
 ('num 2', 'string'),
 ('col_array', 'string'),
 ('col_int', 'string'),
 ('new_col_1', 'string')]

cols.keep(columns=None, regex=None)

Only keep the columns specified.

df.cols.keep("num").table()
num
1
2
2
3

cols.move(column, position, ref_col)

Move a column to specific position

df.cols.move("words", "after", "thing").table()
num animals thing words two strings filter num 2 col_array col_int new_col_1
1 dog housé I like fish cat-car a 1 [baby, sorry] [1, 2, 3] 1
2 cat tv zombies dog-tv b 2 [baby 1, sorry 1] [3, 4] 1
2 frog table simpsons cat lady eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
3 eagle glass null lion-pc c 4 [baby 3, sorry 3] [7, 8] 1

cols.sort(order=”asc”)

Sort dataframes columns asc or desc

df.cols.sort().table()
animals col_array col_int filter new_col_1 num num 2 thing two strings words
dog [baby, sorry] [1, 2, 3] a 1 1 1 housé cat-car I like fish
cat [baby 1, sorry 1] [3, 4] b 1 2 2 tv dog-tv zombies
frog [baby 2, sorry 2] [5, 6, 7] 1 1 2 3 table eagle-tv-plus simpsons cat lady
eagle [baby 3, sorry 3] [7, 8] c 1 3 4 glass lion-pc null
df.cols.sort(order = "desc").table()
words two strings thing num 2 num new_col_1 filter col_int col_array animals
I like fish cat-car housé 1 1 1 a [1, 2, 3] [baby, sorry] dog
zombies dog-tv tv 2 2 1 b [3, 4] [baby 1, sorry 1] cat
simpsons cat lady eagle-tv-plus table 3 2 1 1 [5, 6, 7] [baby 2, sorry 2] frog
null lion-pc glass 4 3 1 c [7, 8] [baby 3, sorry 3] eagle

cols.drop()

Drops a list of columns

df2 = df.cols.drop("num")
df2.table()
words animals thing two strings filter num 2 col_array col_int new_col_1
I like fish dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1
zombies cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
null eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1
df2 = df.cols.drop(["num","words"])
df2.table()
animals thing two strings filter num 2 col_array col_int new_col_1
dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1
cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1
frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1

Chaining

The past transformations were done step by step, but this can be achieved by chaining all operations into one line of code, like the cell below. This way is much more efficient and scalable because it uses all optimization issues from the lazy evaluation approach.

df\
.cols.rename([('num','number')])\
.cols.drop(["number","words"])\
.withColumn("new_col_2", lit("spongebob"))\
.cols.append("new_col_1", 1)\
.cols.sort(order= "desc")\
.rows.drop(df["num 2"] == 3)\
.table()
two strings thing num 2 new_col_2 new_col_1 filter col_int col_array animals
cat-car housé 1 spongebob 1 a [1, 2, 3] [baby, sorry] dog
dog-tv tv 2 spongebob 1 b [3, 4] [baby 1, sorry 1] cat
lion-pc glass 4 spongebob 1 c [7, 8] [baby 3, sorry 3] eagle

cols.unnest(columns, mark=None, n=None, index=None)

Split array or string in different columns

df.cols.unnest("two strings","-").table()
words num animals thing two strings filter num 2 col_array col_int two strings_0 two strings_1
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] cat car
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] dog tv
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] eagle tv
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] lion pc

Only getting the first element

df.cols.unnest("two strings","-", index = 1).table()
words num animals thing two strings filter num 2 col_array col_int two strings_1
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] car
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] tv
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] tv
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] pc

Unnest array of string

df.cols.unnest(["col_array"]).table()
words num animals thing two strings filter num 2 col_array col_int col_array_0 col_array_1
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] baby sorry
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] baby 1 sorry 1
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] baby 2 sorry 2
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] baby 3 sorry 3

Split in 3 parts

df \
.cols.unnest(["two strings"], n= 3, mark = "-") \
.table()
words num animals thing two strings filter num 2 col_array col_int two strings_0 two strings_1 two strings_2
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] cat car null
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] dog tv null
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] eagle tv plus
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] lion pc null

cols.impute(input_cols, output_cols, strategy=”mean”)

Imputes missing data from specified columns using the mean or median.

# Create test dataset
df_fill = op.spark.createDataFrame([(1.0, float("nan")), (2.0, float("nan")),
                           (float("nan"), 3.0), (4.0, 4.0), (5.0, 5.0)], ["a", "b"])

df_fill.cols.impute(["a", "b"], ["out_a", "out_b"], "median").table()
a b out_a out_b
1.0 NaN 1.0 4.0
2.0 NaN 2.0 4.0
NaN 3.0 2.0 3.0
4.0 4.0 4.0 4.0
5.0 5.0 5.0 5.0

cols.select_by_dtypes(data_type)

Returns one or multiple dataframe columns which match with the data type provided.

df.cols.select_by_dtypes("int").table()
num
1
2
2
3

cols.apply_by_dtypes(columns, func, func_return_type, args=None, func_type=None, data_type=None)

Apply a function using pandas udf or udf if apache arrow is not available.

In the next example we replace a number in a string column with “new string”:

def func(val, attr):
    return attr

df.cols.apply_by_dtypes("filter", func, "string", "new string", data_type="integer").table()
words num animals thing two strings filter num 2 col_array col_int
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3]
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4]
simpsons cat lady 2 frog table eagle-tv-plus new string 3 [baby 2, sorry 2] [5, 6, 7]
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8]

User Define Functions in Optimus

Now we’ll create a UDF function that sum a values (32 in this case) to two columns

df = df.cols.append("new_col_1", 1)

def func(val, attr):
    return val + attr

df.cols.apply(["num", "new_col_1"], func, "int", 32 ,"udf").table()
words num animals thing two strings filter num 2 col_array col_int new_col_1
I like fish 33 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 33
zombies 34 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 33
simpsons cat lady 34 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 33
null 35 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 33

Now a we’ll create a Pandas UDF function that sum a values (10 in this case) to two columns

def func(val, attr):
    return val + attr

df.cols.apply(["num", "new_col_1"], func, "int", 10).table()
words num animals thing two strings filter num 2 col_array col_int new_col_1
I like fish 11 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 11
zombies 12 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 11
simpsons cat lady 12 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 11
null 13 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 11

Create an abstract udf to filter a rows where the value of column “num”> 1

from optimus.functions import abstract_udf as audf

def func(val, attr):
    return val>1

df.rows.select(audf("num", func, "boolean")).table()
words num animals thing two strings filter num 2 col_array col_int new_col_1
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1

Create an abstract udf (Pandas UDF) to pass two arguments to a function a apply a sum operation

from optimus.functions import abstract_udf as audf

def func(val, attr):
    return val+attr[0]+ attr[1]

df.withColumn("num_sum", audf ("num", func, "int", [10,20])).table()
words num animals thing two strings filter num 2 col_array col_int new_col_1 num_sum
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1 31
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1 32
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1 32
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1 33

cols.apply_expr(columns, func=None, args=None, filter_col_by_dtypes=None, verbose=True)

Apply a expression to column.

Here we’ll apply a column expression to when the value of “num” or “num 2” is grater than 2:

from pyspark.sql import functions as F
def func(col_name, attr):
    return F.when(F.col(col_name)>2 ,10).otherwise(1)

df.cols.apply_expr(["num","num 2"], func).table()
words num animals thing two strings filter num 2 col_array col_int new_col_1
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1
zombies 1 cat tv dog-tv b 1 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady 1 frog table eagle-tv-plus 1 10 [baby 2, sorry 2] [5, 6, 7] 1
null 10 eagle glass lion-pc c 10 [baby 3, sorry 3] [7, 8] 1

Convert to uppercase:

from pyspark.sql import functions as F
def func(col_name, attr):
    return F.upper(F.col(col_name))

df.cols.apply_expr(["two strings","animals"], func).table()
words num animals thing two strings filter num 2 col_array col_int new_col_1
I like fish 1 DOG housé CAT-CAR a 1 [baby, sorry] [1, 2, 3] 1
zombies 2 CAT tv DOG-TV b 2 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady 2 FROG table EAGLE-TV-PLUS 1 3 [baby 2, sorry 2] [5, 6, 7] 1
null 3 EAGLE glass LION-PC c 4 [baby 3, sorry 3] [7, 8] 1

cols.count_na(columns)

Returns the NAN and Null count in a Column.

import numpy as np

df_null = op.spark.createDataFrame(
    [(1, 1, None), (1, 2, float(5)), (1, 3, np.nan), (1, 4, None), (1, 5, float(10)), (1, 6, float('nan')), (1, 6, float('nan'))],
    ('session', "timestamp1", "id2"))

df_null.cols.count_na("*")

Out -> {'session': 0, 'timestamp1': 0, 'id2': 5}

cols.count_uniques(columns, estimate=True)

Returns how many unique items exist in a columns

df.cols.count_uniques("*")

And you’ll get:

{'words': {'approx_count_distinct': 3},
 'num': {'approx_count_distinct': 3},
 'animals': {'approx_count_distinct': 4},
 'thing': {'approx_count_distinct': 4},
 'two strings': {'approx_count_distinct': 4},
 'filter': {'approx_count_distinct': 4},
 'num 2': {'approx_count_distinct': 4},
 'col_array': {'approx_count_distinct': 3},
 'col_int': {'approx_count_distinct': 4},
 'new_col_1': {'approx_count_distinct': 1}}

cols.replace(columns, search_and_replace=None, value=None, regex=None)

Replace a value or a list of values by a specified string

df.cols.replace("animals",["dog","cat"],"animals").table()

Replace “dog”,”cat” in column “animals” by the word “animals”:

words num animals thing two strings filter num 2 col_array col_int new_col_1
I like fish 1 animals housé cat-car a 1 [baby, sorry] [1, 2, 3] 1
zombies 2 animals tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1

Replace “dog-tv”, “cat”, “eagle”, “fish” in columns “two strings”,”animals” by “animals”:

df.cols.replace(["two strings","animals"], ["dog-tv", "cat", "eagle", "fish"], "animals").table()
words num animals thing two strings filter num 2 col_array col_int new_col_1
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1
zombies 2 animals tv animals b 2 [baby 1, sorry 1] [3, 4] 1
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1
null 3 animals glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1

cols.nest(input_cols, output_col, shape=None, separator=” “)

Concat multiple columns to one with the format specified

df.cols.nest(["num", "new_col_1"], output_col = "col_nested", shape ="vector").table()

Merge two columns in a column vector:

words num animals thing two strings filter num 2 col_array col_int new_col_1 col_nested
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1 [1.0,1.0]
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1 [2.0,1.0]
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1 [2.0,1.0]
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1 [3.0,1.0]

Merge two columns in a string columns:

df.cols.nest(["animals", "two strings"], output_col= "col_nested", shape = "string").table()
words num animals thing two strings filter num 2 col_array col_int new_col_1 col_nested
I like fish 1 dog housé cat-car a 1 [baby, sorry] [1, 2, 3] 1 dog cat-car
zombies 2 cat tv dog-tv b 2 [baby 1, sorry 1] [3, 4] 1 cat dog-tv
simpsons cat lady 2 frog table eagle-tv-plus 1 3 [baby 2, sorry 2] [5, 6, 7] 1 frog eagle-tv-plus
null 3 eagle glass lion-pc c 4 [baby 3, sorry 3] [7, 8] 1 eagle lion-pc

Row Operations

Here you will see a detailed overview of all the row operations available in Optimus. You can access the operations via df.rows

Let’s create a sample dataframe to start working.

words num animals thing second filter
I like fish 1 dog dog housé 5 a
zombies 2 cat tv 6 b
simpsons cat lady 2 frog table 7 1
null 3 eagle glass 8 c

rows.append(row)

Append a row at the end of a dataframe

df.rows.append(["this is a word",2, "this is an animal", "this is a thing", 64, "this is a filter"]).table()
words num animals thing second filter
I like fish 1 dog dog housé 5 a
zombies 2 cat tv 6 b
simpsons cat lady 2 frog table 7 1
null 3 eagle glass 8 c
this is a word 2 this is an animal this is a thing 64 this is a filter

rows.sort()

Sort the columns by rows or multiple conditions.

df.rows.sort("animals").table()
words num animals thing second filter
simpsons cat lady 2 frog table 7 1
null 3 eagle glass 8 c
I like fish 1 dog dog housé 5 a
zombies 2 cat tv 6 b
df.rows.sort("animals", "desc").table()
words num animals thing second filter
simpsons cat lady 2 frog table 7 1
null 3 eagle glass 8 c
I like fish 1 dog dog housé 5 a
zombies 2 cat tv 6 b
df.rows.sort([("animals","desc"),("thing","asc")]).table()
words num animals thing second filter
simpsons cat lady 2 frog table 7 1
null 3 eagle glass 8 c
I like fish 1 dog dog housé 5 a
zombies 2 cat tv 6 b

rows.select(*args, **kwargs)

Alias of Spark filter function. Return rows that match a expression.

df.rows.select(df["num"]==1).table()
words num animals thing second filter
I like fish 1 dog dog housé 5 a

rows.select_by_dtypes(col_name, data_type=None)

This function has built in order to filter some type of row depending of the var type detected by python

words num animals thing second filter
simpsons cat lady 2 frog table 7 1

rows.drop(where=None)

Drop a row depending on a dataframe expression

df.rows.drop((df["num"]==2) | (df["second"]==5)).table()
words num animals thing second filter
null 3 eagle glass 8 c

rows.drop_by_dtypes(col_name, data_type=None)

Drop rows by cell data type

df.rows.drop_by_dtypes("filter", "int").table()
words num animals thing second filter
I like fish 1 dog dog housé 5 a
zombies 2 cat tv 6 b
null 3 eagle glass 8 c

Drop using an abstract UDF

from optimus.functions import abstract_udf as audf

def func_data_type(value, attr):
    return value >1


df.rows.drop(audf("num", func_data_type, "boolean")).table()
words num animals thing second filter
I like fish 1 dog dog housé 5 a

Feature Engineering with Optimus

Now with Optimus we have made easy the process of Feature Engineering.

When we talk about Feature Engineering we refer to creating new features from your existing ones to improve model performance. Sometimes this is the case, or sometimes you need to do it because a certain model doesn’t recognize the data as you have it, so these transformations let you run most of Machine and Deep Learning algorithms.

These methods are part of the DataFrameTransformer, and they are a high level of abstraction for Spark Feature Engineering methods. You’ll see how easy it is to prepare your data with Optimus for Machine Learning.

Methods for Feature Engineering

fe.string_to_index(input_cols)

This method maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values.

df Data frame to transform input_cols argument receives a list of columns to be indexed.

Let’s start by creating a DataFrame with Optimus.

from pyspark.sql import Row, types
from pyspark.ml import feature, classification

from optimus import Optimus

from optimus.ml.models import ML
import optimus.ml.feature as fe

op = Optimus()
ml = ML()
spark = op.spark
sc = op.sc

# Creating sample DF
data = [('Japan', 'Tokyo', 37800000),('USA', 'New York', 19795791),('France', 'Paris', 12341418),
          ('Spain','Madrid',6489162)]
df = op.spark.createDataFrame(data, ["country", "city", "population"])

df.table()
country city population
Japan Tokyo 37800000
USA New York 19795791
France Paris 12341418
Spain Madrid 6489162
# Indexing columns 'city" and 'country'
df_sti = fe.string_to_index(df, input_cols=["city", "country"])

# Show indexed DF
df_sti.table()
country city population city_index country_index
Japan Tokyo 37800000 1.0 1.0
USA New York 19795791 2.0 3.0
France Paris 12341418 3.0 2.0
Spain Madrid 6489162 0.0 0.0

fe.index_to_string(input_cols)

This method maps a column of indices back to a new column of corresponding string values. The index-string mapping is either from the ML (Spark) attributes of the input column, or from user-supplied labels (which take precedence over ML attributes).

df Data frame to transform input_cols argument receives a list of columns to be indexed.

Let’s go back to strings with the DataFrame we created in the last step.

# Indexing columns 'city" and 'country'
df_sti = fe.string_to_index(df, input_cols=["city", "country"])

# Show indexed DF
df_sti.table()
country city population city_index country_index
Japan Tokyo 37800000 1.0 1.0
USA New York 19795791 2.0 3.0
France Paris 12341418 3.0 2.0
Spain Madrid 6489162 0.0 0.0
# Going back to strings from index
df_its = fe.string_to_index(df_sti, input_cols=["country_index"])

# Show DF with column "county_index" back to string
df_its.table()
country city population country_index city_index country_index_string
Japan Tokyo 37800000 1.0 1.0 Japan
USA New York 19795791 3.0 2.0 USA
France Paris 12341418 2.0 3.0 France
Spain Madrid 6489162 0.0 0.0 Spain

fe.one_hot_encoder(input_cols)

This method maps a column of label indices to a column of binary vectors, with at most a single one-value.

df Data frame to transform input_cols argument receives a list of columns to be encoded.

Let’s create a sample dataframe to see what OHE does:

# Creating DataFrame
data = [
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
]
df = op.spark.createDataFrame(data,["id", "category"])

# One Hot Encoding
df_ohe = fe.one_hot_encoder(df, input_cols=["id"])

# Show encoded dataframe
df_ohe.table()
id category id_encoded
0 a (5,[0],[1.0])
1 b (5,[1],[1.0])
2 c (5,[2],[1.0])
3 a (5,[3],[1.0])
4 a (5,[4],[1.0])
5 c (5,[],[])

Transformer.vector_assembler(input_cols)

This method combines a given list of columns into a single vector column.

input_cols argument receives a list of columns to be encoded.

This is very important because lots of Machine Learning algorithms in Spark need this format to work.

Let’s create a sample dataframe to see what vector assembler does:

# Import Vectors
from pyspark.ml.linalg import Vectors

# Creating DataFrame
data = [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)]

df = op.spark.createDataFrame(data,["id", "hour", "mobile", "user_features", "clicked"])

# Assemble features
df_va = fe.vector_assembler(df, input_cols=["hour", "mobile", "user_features"])

# Show assembled df
print("Assembled columns 'hour', 'mobile', 'user_features' to vector column 'features'")
df_va.select("features", "clicked").table()
features clicked
[18.0,1.0,0.0,10.0,0.5] 1.0

fe.normalizer(input_cols,p=2.0)

This method transforms a dataset of Vector rows, normalizing each Vector to have unit norm. It takes parameter p, which specifies the p-norm used for normalization. (p=2) by default.

input_cols argument receives a list of columns to be normalized.

p argument is the p-norm used for normalization.

Let’s create a sample dataframe to see what normalizer does:

id features features_normalized
0 [1.0,0.5,-1.0] [0.6666666666666666,0.3333333333333333,-0.6666666666666666]
1 [2.0,1.0,1.0] [0.8164965809277261,0.4082482904638631,0.4082482904638631]
2 [4.0,10.0,2.0] [0.3651483716701107,0.9128709291752769,0.18257418583505536]

Machine Learning with Optimus

Machine Learning is one of the last steps, and the goal for most Data Science WorkFlows.

Apache Spark created a library called MLlib where they coded great algorithms for Machine Learning. Now with the ML library we can take advantage of the Dataframe API and its optimization to create easily Machine Learning Pipelines.

Even though this task is not extremely hard, is not easy. The way most Machine Learning models work on Spark are not straightforward, and they need lots feature engineering to work. That’s why we created the feature engineering section inside the Transformer.

To import the Machine Learning Library you just need to say to import Optimus and the ML API:

from pyspark.sql import Row, types
from pyspark.ml import feature, classification
from optimus import Optimus
from optimus.ml.models import ML
from optimus.ml.functions import *

op = Optimus()
ml = ML()
spark = op.spark
sc = op.sc

Let’s take a look of what Optimus can do for you:

Methods for Machine Learning

ml.logistic_regression_text(df, input_col)

This method runs a logistic regression for input (text) DataFrame.

Let’s create a sample dataframe to see how it works.

# Import Row from pyspark
from pyspark.sql import Row
# Importing Optimus
import optimus as op

df = op.sc. \
    parallelize([Row(sentence='this is a test', label=0.),
                 Row(sentence='this is another test', label=1.)]). \
    toDF()

df.table()
label sentence
0.0 this is a test
1.0 this is another test
df_predict, ml_model = ml.logistic_regression_text(df, "sentence")

This instruction will return two things, first the DataFrame with predictions and steps to build it with a pipeline and a Spark machine learning model where the third step will be the logistic regression.

The columns of df_predict are:

df_predict.columns

['label',
'sentence',
'Tokenizer_4df79504b43d7aca6c0b__output',
'CountVectorizer_421c9454cfd127d9deff__output',
'LogisticRegression_406a8cef8029cfbbfeda__rawPrediction',
'LogisticRegression_406a8cef8029cfbbfeda__probability',
'LogisticRegression_406a8cef8029cfbbfeda__prediction']

The names are long because those are the uid for each step in the pipeline. So lets see the prediction compared with the actual labels:

df_predict.cols.select([[0,6]).table()
label LogisticRegression_406a8cef8029cfbbfeda__prediction
0.0 0.0
1.0 1.0

So we just did ML with a single line in Optimus. The model is also exposed in the ml_model variable so you can save it and evaluate it.

ml.n_gram(df, input_col, n=2)

This method converts the input array of strings inside of a Spark DF into an array of n-grams. The default n is 2 so it will produce bi-grams.

Let’s create a sample dataframe to see how it works.

df = op.sc. \
    parallelize([['this is the best sentence ever'],
                 ['this is however the worst sentence available']]). \
    toDF(schema=types.StructType().add('sentence', types.StringType()))

df_model, tfidf_model = n_gram(df, input_col="sentence", n=2)

The columns of df_predict are:

['sentence',
 'Tokenizer_4a0eb7921c3a33b0bec5__output',
 'StopWordsRemover_4c5b9a5473e194516f3f__output',
 'CountVectorizer_41638674bb4c4a8d454c__output',
 'NGram_4e1d89fc70917c522134__output',
 'CountVectorizer_4513a7ba6ce22e617be7__output',
 'VectorAssembler_42719455dc1bde0c2a24__output',
 'features']

So lets see the bi-grams (we can change n as we want) for the sentences:

df_model.cols.select([[0,4]).table()
sentence NGram_4e1d89fc70917c522134__output
this is the best sentence ever [best sentence, sentence ever]
this is however the worst sentence available [however worst, worst sentence, sentence available]

And that’s it. N-grams with only one line of code.

Above we’ve been using the Pyspark Pipes definitions of Daniel Acuña, that he merged with Optimus, and because we use multiple pipelines we need those big names for the resulting columns, so we can know which uid correspond to each step.

Tree models with Optimus

Yes the rumor is true, now you can build Decision Trees, Random Forest models and also Gradient Boosted Trees with just one line of code in Optimus. Let’s download some sample data for analysis.

We got this dataset from Kaggle. The features are computed from a digitized image of a fine needle aspirate (FNA) of a breast mass. They describe characteristics of the cell nuclei present in the image. n the 3-dimensional space is that described in: [K. P. Bennett and O. L. Mangasarian: “Robust Linear Programming Discrimination of Two Linearly Inseparable Sets”, Optimization Methods and Software 1, 1992, 23-34].

Let’s download it with Optimus and save it into a DF:

# Downloading and creating Spark DF
df = op.load.url("https://raw.githubusercontent.com/ironmussa/Optimus/master/tests/data_cancer.csv")

ml.random_forest(df, columns, input_col)

One of the best “tree” models for machine learning is Random Forest. What about creating a RF model with just one line? With Optimus is really easy.

Let’s download some sample data for analysis.

df_predict, rf_model = ml.random_forest(df_cancer, columns, "diagnosis")

This will create a DataFrame with the predictions of the Random Forest model.

Let’s see df_predict:

['label',
 'diagnosis',
 'radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

So lets see the prediction compared with the actual label:

df_predict.select([0,15]).table()
label prediction
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 0.0
1.0 1.0
1.0 1.0
0.0 0.0

only showing top 20 rows

The rf_model variable contains the Random Forest model for analysis.

It will be the same for Decision Trees and Gradient Boosted Trees, let’s check it out.

ml.decision_tree(df, columns, input_col)

df_predict, dt_model = ml.random_forest(df_cancer, columns, "diagnosis")

This will create a DataFrame with the predictions of the Decision Tree model.

Let’s see df_predict:

['label',
 'diagnosis',
 'radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

So lets see the prediction compared with the actual label:

df_predict.select([0,15]).table()
label prediction
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
0.0 0.0

only showing top 20 rows

ml.gbt(df, columns, input_col)

df_predict, gbt_model = ml.gbt(df_cancer, columns, "diagnosis")

This will create a DataFrame with the predictions of the Gradient Boosted Trees model.

Let’s see df_predict:

['label',
 'diagnosis',
 'radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

So lets see the prediction compared with the actual label:

df_predict.select([0,15]).show()
label prediction
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
1.0 1.0
0.0 0.0

only showing top 20 rows

Library maintained by Favio Vazquez and Argenis Leon