NStack Documentation¶
Welcome to the NStack Documentation! Please also look at our main GitHub page , and our online examples.
What is NStack?¶
- Productionise your models and data integration code to the cloud as reusable functions that can be monitored, versioned, shared, and updated
- Effortlessly compose workflows to integrate your functions with your data and event sources, without dealing with any infrastructure
- NStack containerises and orchestrates your workflows on your cloud provider and makes sure they always work
Contents¶
NStack Uses¶
NStack allows you to productionise Python and R models and integrate them with data-sources and business platforms. There are many use-cases, but common ones include:
Propensity Analysis on your DMP¶
Productionise a propensity model and schedule it to read data from your data warehouse and send high-scoring users to your DMP (i.e. Krux, BlueKai, Permutive, or Lotame).
Churn Modeling on your CRM¶
Productionise a churn model and schedule it to read data from an internal database and send users at risk of churn to your CRM.
Build an HTTP endpoint from a Model¶
Turn any Python or R model into a robust, shareable HTTP endpoint accessible by other teams.
NStack Concepts¶
Example¶
We can express this within the NStack scripting language locally as follows (it can help to think of it akin to Bash-style piping for microservices),
module Demo:0.1.0 {
import NStack.Transformers:0.1.4 as T
import Acme.Classifiers:0.3.0 as C
// our analytics workflow
def workflow = Sources.Postgresql<(Text, Int)>
| T.transform { strength = 5 }
| C.classify { model = "RandomForest" }
| Sinks.S3blob<Text>
}
Intro Screencast¶
Modules¶
A module is a piece of code that has been deployed to NStack, either by you or someone else. It has an input schema and an output schema, which defines what kind of data it can receive, and the kind of data that it returns.
Sources & Sinks¶
- A source is something which emits a stream of data.
- A sink is something which can receive a stream of data.
Example sources and sinks are databases, files, message-queues, and HTTP endpoints. Like modules, you can define the input and output schemas for your sources and sinks.
Workflows¶
Modules, sources, and sinks can be combined together to build workflows. This is accomplished using the NStack Workflow Language, a simple, high-level language for connecting things together on the NStack Platform.
Processes¶
When a workflow is started, it becomes a running process. You can have multiple processes of the same workflow.
Installation & Upgrading¶
Installation¶
NStack is platform-agnostic and can run out-of-the-box wherever you can run a virtual machine, including:
- your cloud provider of choice
- your internal cloud
- locally using VirtualBox, VMWare, or your operating system’s native virtualisation
For Proof of Concepts, NStack offers a hosted solution. If this is required, please reach out to info@nstack.com.
The virtual appliance can be found on the NStack’s GitHub Releases page, where it is provided as a raw
image. We also provide an AWS AMI, which can be found under the id of ami-53a47245
. Basic install instructions can be found on our [GitHub page](www.github.com/nstack/nstack).
To launch this AMI to an EC2 instance on your AWS account, you can click here.
The NStack Server is configured using cloud-init
, which is supported by major cloud providers, so it will work out of the box with your credentials. Please note that the first time you boot NStack it may take a few minutes to initialise.
Upgrading¶
The NStack Server can be updated atomically using rpm-ostree. To upgrade to a new release, you can simply run:
> sudo rpm-ostree upgrade
Following the upgrade, you should reboot your machine with:
> sudo reboot
NStack releases follow a monthly cadence.
Quick Tutorial¶
In this section, we’re going to see how to build up a simple NStack module, deploy it to the cloud, and use it in a workflow by connecting it to a source and a sink.
Intro Screencast¶
The following screencast accompanies this tutorial, demonstrating building a module and connecting it within a workflow,
By the end of the tutorial, you will learn how to publish your code to NStack and connect it to event and data sources. Enjoy!
Note
To learn more about modules, sources, and sinks, read Concepts
Make sure you have installed NStack and let’s get going. These instructions are for the Linux and macOS versions of the NStack CLI, so adjust accordingly if you are on Windows.
Building a Module¶
Modules contain the functions that you want to publish to the NStack platform.
After this tutorial, we will have a simple Python module deployed to our NStack instance. This module will have a single function in it which counts the number of characters in some text.
Note
Before starting, check that NStack is installed by running nstack --version
in your terminal. If you got information about the version of NStack you have, you’re good to go. If that didn’t work, check out Installation again.
Step 1: init
¶
We want to create a new Python module.
Create a directory called Demo
where you would like to build your module and cd
into that directory using your terminal. NStack uses the name of the directory as the default name of the module
To create a new module, run nstack init -l python
.
You should see the following output confirming that this operation was successful.
~> mkdir Demo.NumChars
~> cd Demo.NumChars
~/Demo> nstack init -l python
python module 'Demo.NumChars:0.0.1-SNAPSHOT' successfully initialised at ~/Demo.NumChars
Because NStack versions your modules, it has given Demo.NumChars
a version number (0.0.1-SNAPSHOT
). Because the version number has a SNAPSHOT
appended to it, this means NStack allows you to override it every time you build. This is helpful for development, as you do not need to constantly increase the version number. When you deem your module is ready for release, you can remove SNAPSHOT
and NStack will create an immutable version of 0.0.1
.
A successful init
will have created some files.
~/Demo.NumChars> ls
nstack.yaml requirements.txt service.py setup.py module.nml
This is the skeleton of an NStack module. nstack.yaml
is the configuration file for your module, module.nml
describes the functions and types defined in your module, and service.py
is where the code of your module lives (in this case, it’s a Python class). requirements.txt
and setup.py
are both standard files for configuring Python.
We’re going to be concerned with module.nml
and service.py
. For a more in-depth look at all these files, refer to Module Structure.
In service.py
, there is a Service
class. This is where we write the functions we want to use on NStack. It is pre-populated with a sample function, numChars
, that counts the number of characters in some text.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Demo.NumChars Module
"""
import nstack
class Module(nstack.Module):
def numChars(self, x):
return len(x)
module.nml
is where you tell NStack which of the functions in service.py
you want to publish as functions on NStack,
and their input and output schemas (also known as types).
module Demo.NumChars:0.0.1-SNAPSHOT
fun numChars : Text -> Integer
In this instance, we are telling NStack to publish one function, numChars
, which takes Text
and returns an Integer
.
Note
The schema – or type – system is a key feature of NStack that lets you define the sort of data your function can take as input, and produce as output. This helps you ensure that your module can be reused and works as intended in production.
Step 2: build
¶
To build and publish our module on NStack, we use the build
command.
~/Demo.NumChars> nstack build
Building NStack Container module Demo.NumChars:0.0.1-SNAPSHOT. Please wait. This may take some time.
Module Demo.NumChars:0.0.1-SNAPSHOT built successfully. Use `nstack list functions` to see all available functions
When we run build
, the code is packaged up and sent to the server.
We can check that our numChars
function is live by running the suggested nstack list functions
command:
~/Demo.NumChars> nstack list functions
Demo.NumChars:0.0.1-SNAPSHOT
numChars : Text -> Integer
That’s it! Our numChars
function is live in the cloud, and is ready to be connected to input and output data streams, which the next tutorial will cover.
Building a Workflow¶
In the previous tutorial, we built and published a Python module, Demo.
This module had a single function on it, numChars
, which counted the number of characters in some text. Although it has been published, it needs to be connected to a source and a sink.
Note
Sources generate data which gets sent to your function, and sinks receive the data which your function outputs. Learn more in Concepts
Let’s refresh ourselves on what the input and output types of our function were by asking NStack:
> nstack list functions
Demo:0.0.1-SNAPSHOT.numChars : Text -> Integer
This means that our function can be connected to any source which generates Text
, and can write to any sink which can take an Integer
as input.
One of the sources that NStack provides is http
;
if you use this source, NStack sets up an HTTP endpoint which you can send JSON
-encoded data to.
As a sink, we are going to use the NStack log
,
which is a sink for seeing the output from your function. We are going to use these two integrations in our tutorial.
Note
See a list of available sources and sinks in Supported Integrations
Creating a workflow module¶
To write workflows, we create a special NStack workflow module, which we create in the same way we create a Python module – by using init
.
Let’s create a new directory called DemoWorkflow
, cd
into the directory, and create a new workflow module.
~/DemoWorkflow/ nstack init --workflow
Module 'DemoWorkflow:0.0.1-SNAPSHOT' successfully initialised at /home/nstack/Demo/DemoWorkflow
init
has created a single file, module.nml
, which is where we write our workflow module using NStack’s scripting language. If we look inside the file, we see that NStack has created an example module for us.
Note
Just like Python modules, workflow modules are versioned.
module DemoWorkflow:0.0.1-SNAPSHOT
import Demo.NumChars:0.0.1-SNAPSHOT as D
// A sample workflow
def w = Sources.http<Text> { http_path = "/demo" } | D.numChars | Sinks.log<Integer>
This currently has a single workflow on it, w
, which uses a function imported from a module called Demo.NumChars
with the version number of 0.0.1
.
Like the workflow we will create, this example workflow creates an HTTP endpoint which pipes data to a function, and pipes data from the function to the NStack log.
Note
There is no need to create a separate module in order to define a
workflow. You could have included the definition of w
in the
module.nml
of the original Python module Demo.NumChars
.
In that case, you would not need to prefix numChars
with D.
,
as it is defined in the same module.
When we created our Python module, we defined the input and output types of our function in our API. On NStack, sources and sinks also have types: this workflow specifies that the HTTP source only receives and passes on Text
, and the log only accepts Integer
s. Because our Python function takes Text
, counts it, and returns Integer
s, that means it can fit in the middle of the workflow.
Note
The http source is configured in this example to expose an endpoint on /demo
. If you are using the demo server, we would recommend changing /demo
to something more unique – as someone else may have already taken that endpoint.
Let’s break these parts to see what we’re doing:
Part | Description |
---|---|
Sources.http<Text> { http_path = "/demo" } |
Use http as a source, which creates an endpoint on /demo . The <Text> statement means it can only accept and pass on Text. |
Demo.numChars |
The name of the function which we built. |
Sinks.log<Integer> |
Use NStack’s log as a sink. The <Integer> statement means it can only accept Integers. |
NStack uses the |
operator to connect statements together, just like in a shell such as bash
.
Building our workflow¶
Before we start our workflow, we need to build it in the cloud with NStack. We do this in the same way we build a Python module. We save our module.nml
file and run:
~/DemoWorkflow/ nstack build
Building NStack Workflow module DemoWorkflow:0.0.1-SNAPSHOT. Please wait. This may take some time.
Workflow module DemoWorkflow:0.0.1-SNAPSHOT built successfully. Use `nstack list all` to see all available functions.
We can now see our workflow is live by using the list command.
~/DemoWorkflow/ nstack list workflows
DemoWorkflow:0.0.1-SNAPSHOT
w : Workflow
This means our workflow is ready to be started.
Starting and using our workflow¶
To start our workflow in the cloud, we use the start command:
~/DemoWorkflow/ $ nstack start DemoWorkflow:0.0.1-SNAPSHOT w
We now have a live HTTP endpoint on localhost:8080/demo
. The HTTP endpoint is configured to accept JSON-encoded values. We defined it to use an input schema of Text
, so we will be able to send it any JSON string
. In our JSON, we put params
as the key, and our input as the value:
We can call it using nstack send
:
~/DemoWorkflow/ $ nstack send "/demo" '"Foo"'
> Message Accepted
When workflows are started, they become processes which have numerical identifiers (_ids_). We can see the id of our process by running:
~/DemoWorkflow/ $ nstack ps
1
And if we look at the log of our process, which we configured as the sink, we will be able to see the result. Because our process was started with an id of 1
, we run the following:
> nstack log 1
Feb 17 09:59:26 nostromo nstack-server[8925]: OUTPUT: 3
Great - we can see that the output of our function (and the number of characters in “Foo”) is 3.
In-Depth Tutorial - Productionising a Classifier¶
In this section, we’re going to productionise a Random Forest classifier written with sklearn, deploy it to the cloud, and use it in a more sophisticated workflow.
By the end of the tutorial, you will learn how to build modules with dependencies, write more sophisticated workflows, and build abstractions over data-sources. Enjoy!
So far, we have built and published a Python module with a single function on it, numChars
, and built a workflow which connects our function to an HTTP endpoint. This in itself isn’t particularly useful, so, now that you’ve got the gist of how NStack works, it’s time to build something more realistic!
In this tutorial, we’re going to create and productionise a simple classifier which uses the famous iris dataset. We’re going to train our classifier to classify which species an iris is, given measurements of its sepals and petals. You can find the dataset we’re using to train our model here.
First, let’s look at the the format of our data to see how we should approach the problem. We see that we have five fields:
Field Name | Description | Type |
---|---|---|
species |
The species of iris | Text |
sepal_width |
The width of the sepal | Double |
sepal_length |
The length of the sepal | Double |
petal_width |
The width of the petal | Double |
petal_length |
The length of the petal | Double |
If we are trying to find the species based on the sepal and petal measurements, this means these measurements are going to be the input to our classifier module, with text being the output. This means we need to write a function in Python which takes four Double
s and returns Text
.
Creating your classifier module¶
To begin, let’s make a new directory called Iris.Classify
, cd
into it, and initialise a new Python module:
~/ $ mkdir Iris.Classify; cd Iris.Classify
~/Iris.Classify/ $ nstack init --language python
python module 'Iris.Classify' successfully initialised at ~/Iris.Classify
Next, let’s download our training data into this directory so we can use it in our module. We have hosted it for you as a CSV on GitHub.
~/Iris.Classify/ $ curl -O https://raw.githubusercontent.com/nstackcom/nstack-examples/master/iris/Iris.Classify/train.csv
Defining our API¶
As we know what the input and output of our classifier is going to look like, let’s edit module.nml
to define our API (i.e. the entry-point into our module). By default, a new module contains a sample function numChars
, which we replace with our definition. We’re going to call the function we write in Python predict
, which means we write our module.nml
as follows:
module Iris.Classify:0.1.0
fun predict : (Double, Double, Double, Double) -> Text
This means we want to productionise a single function, predict
, which takes four Double
s (the measurements) and returns Text
(the iris species).
Writing our classifier¶
Now that we’ve defined our API, let’s jump into our Python module, which lives in service.py
.
We see that NStack has created a class Module
. This is where we add the functions for our module. Right now it also has a sample function in it, numChars
, which we can remove.
Let’s import the libaries we’re using.
import nstack
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
Note
Python modules must also import nstack
Before we add our predict
function, we’re going to add __init__
, the Python constructor function which runs upon the creation of our module. It’s going to load our data from train.csv
, and use it to train our Random Forest classifier:
def __init__(self):
train = pd.read_csv("train.csv")
self.cols = ['petal_length', 'petal_width', 'sepal_length', 'sepal_width']
colsRes = ['class']
trainArr = train.as_matrix(self.cols)
trainRes = train.as_matrix(colsRes)
rf = RandomForestClassifier(n_estimators=100)
rf.fit(trainArr, trainRes)
self.rf = rf
Now we can write our predict
function. The second argument, inputArr
, is the input – in this case, our four Double
s. To return text, we simply return from the function in Python.
def predict(self, inputArr):
points = [inputArr]
df = pd.DataFrame(points, columns=self.cols)
results = self.rf.predict(df)
return results.item()
Configuring your module¶
When your module is started, it is run in a Linux container on the NStack server. Because our module uses libraries like pandas
and sklearn
, we have to tell NStack to install some extra operating system libraries inside your module’s container. NStack lets us specify these in our nstack.yaml
configuration file in the packages
section. Let’s add the following packages:
packages: ['numpy', 'python3-scikit-learn', 'scipy', 'python3-scikit-image', 'python3-pandas']
Additionally, we want to tell NStack to copy our train.csv
file into our module, so we can use it in __init__
. nstack.yaml
also has a section for specifying files you’d like to include:
files: ['train.csv']
Publishing and starting¶
Now we’re ready to build and publish our classifier. Remember, even though we run this command locally, our module gets built and published on your NStack server in the cloud.
~/Iris.Classify/ $ nstack build
Building NStack Container module Iris.Classify. Please wait. This may take some time.
Module Iris.Classify built successfully. Use `nstack list functions` to see all available functions.
We can now see Iris.Classify.predict
in the list of existing functions (along with previously built functions like demo.numChars
),
~/Iris.Classify/ $ nstack list functions
Iris.Classify:0.0.1-SNAPSHOT
predict : (Double, Double, Double, Double) -> Text
Demo:0.0.1-SNAPSHOT
numChars : Text -> Integer
Our classifier is now published, but to use it we need to connect it to an event source and sink. In the previous tutorial, we used HTTP as a source, and the NStack log as a sink. We can do the same here. This time, instead of creating a workflow module right away, we can use nstack’s notebook
command to test our workflow first. notebook
opens an interactive shell where we can write our workflow. When we are finished, we can Ctrl-D
.
~/Iris.Classify/ $ nstack notebook
import Iris.Classify:0.0.1-SNAPSHOT as Classifier;
Sources.http<(Double, Double, Double, Double)> | Classifier.predict | Sinks.log<Text>
[Ctrl-D]
This creates an HTTP endpoint on http://localhost:8080/irisendpoint
which can receive four Double
s, and writes the results to the log as Text
. Let’s check it is running as a process:
~/Iris.Classify/ $ nstack ps
1
2
In this instance, it is running as process 2
. We can test our classifier by sending it some of the sample data from train.csv
.
~/Iris.Classify/ $ nstack send "/irisendpoint" '[4.7, 1.4, 6.1, 2.9]'
Message Accepted
~/Iris.Classify/ $ nstack log 2
Feb 17 10:32:30 nostromo nstack-server[8925]: OUTPUT: "Iris-versicolor"
Our classifier is now productionised.
Features¶
In this section, we’re going to describe some of the more advanced features you can do with NStack when building your modules and composing them together to build workflows.
Composition¶
Workflows can contain as many steps as you like, as long as the output type of one matches the input type of the other. For instance, let’s say we wanted to create the following workflow based on the Iris example in In-Depth Tutorial - Productionising a Classifier and available on GitHub
- Expose an HTTP endpoint which takes four
Double
s - Send these
Double
s to our classifier,Iris.Classify
, which will tell us the species of the iris - Count the number of characters in the species of the iris using our
Demo.numChars
function - Write the result to the log
We could write the following workflow:
module Iris.Workflow:0.0.1-SNAPSHOT
import Iris.Classify:0.0.1-SNAPSHOT as Classifier
import Demo:0.0.1-SNAPSHOT as Demo
def multipleSteps =
Sources.http<(Double, Double, Double, Double)> { http_path = "/irisendpoint" } |
Classifier.predict |
Demo.numChars |
sinks.log<Integer>
Note
numChars
and predict
can be composed together because their types – or schemas – match. If predict
wasn’t configured to output Text
, or numChars
wasn’t configured to take Text
as input, NStack would not let you build the following workflow.
Streaming multiple values¶
Sometimes it’s desirable to return more than one value from a function. For example, we might want to asynchronously query an HTTP endpoint and process each response independently. If we don’t care about any connection between the different results, we can return each result independently.
Let’s look at a toy example: a function that takes in a list of numbers and returns them as strings. In this case, each transformation takes a reasonable amount of time to compute.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
NumString:0.0.1-SNAPSHOT Service
"""
import nstack
class Module(nstack.Module):
def stringToNum (self, xs):
return [self.transform(x) for x in xs]
def transform(self, x):
time.sleep(5) # TODO: Work out how to make this more efficient
return str(x)
If we don’t need the entire list at once, we can change this to a Python generator. Rather than working on a list, our next function will have to work on an individual string. That is, when we return a generator, each output is passed individually to the next function.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
NumString:0.0.1-SNAPSHOT Service
"""
import nstack
class Module(nstack.Module):
def stringToNum (self, xs):
return (self.transform(x) for x in xs)
def transform(self, x):
time.sleep(5) # TODO: Work out how to make this more efficient
return str(x)
Workflow Reuse¶
All of the workflows that we have written so far have been fully composed, which means that they contain a source and a sink. Many times, you want to split up sources, sinks, and functions into separate pieces you can share and reuse. In this case, we say that a workflow is partially composed, which just means it does not contain a source and a sink. These workflows cannot be start
ed by themselves, but can be shared and attached to other sources and/or sinks to become fully composed.
For instance, we could combine Iris.Classify.predict
and demo.numChars
from the previous example to form a new workflow speciesLength
like so:
module Iris.Workflow:0.0.1-SNAPSHOT
import Iris.Classify:0.0.1-SNAPSHOT as Classifier
import Demo:0.0.1-SNAPSHOT as Demo
def speciesLength = Classifier.predict | Demo.numChars
Because our workflow Iris.Workflow.speciesLength
has not been connected to a source or a sink, it in itself is still a function. If we build this workflow, we can see speciesLength
alongside our other functions by using the list
command:
~/Iris.Workflow/ $ nstack list functions
Iris.Classify:0.0.1-SNAPSHOT
predict : (Double, Double, Double, Double) -> Text
Demo:0.0.1
numChars : Text -> Integer
Iris.Workflow:0.0.1-SNAPSHOT
speciesLength : (Double, Double, Double, Double) -> Integer
As we would expect, the input type of the workflow is the input type of Iris.Classify.predict
, and the output type is the output type of demo.numChars
. Like other functions, this must be connected to a source and a sink to make it fully composed, which means we could use this workflow it in another workflow.
module Iris.Endpoint:0.0.1-SNAPSHOT
import Iris.Workflow:0.0.1-SNAPSHOT as IrisWF
def http = Sources.http<(Double, Double, Double, Double)> |
IrisWF.speciesLength |
Sinks.log<Integer>
Often times you want to re-use a source or a sink without reconfiguring them. To do this, we can similarly separate the sources and sinks into separate workflows, like so:
module Iris.Workflow:0.0.1-SNAPSHOT
import Iris.Classify:0.0.1-SNAPSHOT as Classifier
def httpEndpoint = sources.http<(Double, Double, Double, Double)> { http_path = "speciesLength" }
def logSink = sinks.log<Text>
def speciesWf = httpEndpoint | Classifier.predict | logSink
Separating sources and sinks becomes useful when you’re connecting to more complex integrations which you don’t want to configure each time you use it – many times you want to reuse a source or sink in multiple workflows. In the following example, we are defining a module which provides a source and a sink which both sit ontop of Postgres.
module Iris.DB:0.0.1-SNAPSHOT
def petalsAndSepals = Sources.postgres<(Double, Double, Double, Double)> {
pg_database = "flowers",
pg_query = "SELECT * FROM iris"
}
def irisSpecies = Sinks.postgres<Text> {
pg_database = "flowers",
pg_table = "iris"
}
If we built this module, petalsAndSepals
and irisSpecies
could be used in other modules as sources and sinks, themselves.
We may also want to add a functions to do some pre- or post- processing to a source or sink. For instance:
module IrisCleanDbs:0.0.1-SNAPSHOT
import PetalTools:1.0.0 as PetalTools
import TextTools:1.1.2 as TextTools
import Iris.DB:0.0.1-SNAPSHOT as DB
def roundedPetalsSource = DB.petalsAndSepals | PetalsTools.roundPetalLengths
def irisSpeciesUppercase = TextTools.toUppercase | DB.irisSpecies
Because roundedPetalsSource
is a combination of a source and a function, it is still a valid source. Similarly, irisSpeciesUppercase
is a combination of a function and a sink, so it is still a valid sink.
Because NStack functions, source, and sinks can be composed and reused, this lets you build powerful abstractions over infrastructure.
Versioning¶
Modules in NStack are versioned with a 3-digit suffix that is intended to follow semantic versioning, e.g.:
Demo:0.0.1
This is specified in the nstack.yaml
for code-based modules, and in module.nml
for workflow modules.
A module of a specific version is completely immutable, and it’s not possible to build another copy of the module with the same version without deleting it first.
Snapshots¶
When creating a new module, i.e. with nstack init
, your module will have the version number (0.0.1-SNAPSHOT
).
The SNAPSHOT
tag tells NStack to allow you to override it every time you build.
This is helpful for development, as you do not need to constantly increase the version number.
When you deem your module is ready for release, you can remove the SNAPSHOT
suffix and NStack will create an immutable version of 0.0.1
.
Note
Care is needed importing SNAPSHOT modules, NStack will warn you if your snapshot module changes in such a way that your imports/pipeline are no longer valid and ask you to rebuild if needed. You can also resolve this using project files that rebuild all dependencies as needed - see NStack Projects
Configuration¶
In addition to receiving input at runtime, modules, sources, and sinks often need to be able to configured by a workflow author. To do this, we use brackets and pass in a list of named records:
Sources.Postgres<Text> {
pg_host = "localhost",
pg_port = "5432",
pg_user = "user",
pg_password = "123456",
pg_database = "db",
pg_query = "SELECT * FROM tbl;"
}
For sources and sinks, some parameters are mandatory, and some provide sensible defaults. This is documented in Supported Integrations.
To pass configuration parameters to a module, we use the same syntax
FirstLastName.full_name { first_name = "John" }
NStack passes in configuration parameters as a dictionary, args
, which is added to the base class of your module.
For instance, in Python you can access configuration parameters in the following manner:
class Module(nstack.Module):
def full_name(self, second_name):
full_name = "{} {}".format(self.args.get("first_name", "Tux"), second_name)
return full_name
Framework Modules¶
It is often useful to create a common parent module with dependencies already installed, either to save time or for standardisation. NStack supports this with Framework Modules. Simply create a new module similar to above, nstack init framework [parent]
, and modify the resulting nstack.yaml
as needed.
You can then build this module using nstack build
, and refer to it from later modules within the parent
field of their nstack.yaml
config file.
NStack Projects¶
When using NStack you may find that you are working on several different modules at once that are imported into a main module where they are composed into a workflow. In these cases it can be cumbersome to to ensure you rebuilt every module manually to ensure all changes are propagated. NStack provides projects in these cases that are used to logically group a set of modules together so that they are all built together in the correct order.
Assuming your modules are all contained as directories within ./modules
, an NStack project can be formed by creating a file called nstack-project.yaml
in the root directory, e.g.
modules/
├── Acme.PCA/
├── Acme.CustomerChurn/
└── nstack-project.yaml
where nstack-project.yaml
simply contains the list of module directories that can be ordered as needed, e.g.
# NStack Acme Project File
modules:
- Acme.PCA
- Acme.CustomerChurn
Simply run nstack build
from the root modules
directory and all listed modules will be compiled in the order given.
Supported Languages¶
NStack is language-agnostic and allows you to write modules in multiple languages and connect them together – currently we support Python, with R and Java coming soon. More languages will be added over time – if there’s something you really need, please let us know!
Python¶
Basic Structure¶
NStack services in Python inherit from a base class, called Module
within the nstack
module:
import nstack
class Module(nstack.Module):
def numChars(self, msg):
return len(msg)
Note
Ensure you import the nstack module in your service, e.g. import nstack
Any function that you export within the API
section of your nstack yaml
must exist as a method on this class (you can add private methods on this class for internal use as expected in Python).
Data comes into this function via the method arguments - for nstack
all the data is passed within a single argument that follows the self
parameter. For instance, in the example above there is a single argument msg
consisting of a single string
element that may be used as-is. However if your function was defined as taking (Double, Double)
, you would need to unpack the tuple in Python first as follows,
def test(self, msg):
x, y = msg
...
Similarly, to return data from your NStack function, simply return it as a single element within your Python method, as in the top example above.
The NStack object lasts for the life-time of the workflow. Any initialisation can be performed within the object startup
method that will be called automatically on service start, e.g. open a database connection, load a data-set, etc.
Similarly a shutdown
method is called on service shutdown, this provides a short time window, 90s, to perform any required shutdown routines before the service is terminated i.e.
import nstack
class Module(nstack.Module):
def startup(self):
# custom initialisation here
def shutdown(self):
# custom shutdown here
Notes¶
- Anything your``print`` will show up in
nstack log
to aid debugging. (all output onstdout
andstderr
is sent to the NStack logs) - Extra libraries from pypi using
pip
can be installed by adding them to therequirements.txt
file in the project directory - they will be installed duringnstack build
Mapping NStack types to Python types¶
The table below show you what python types to expect and to return when dealing with types defined in the NStack IDL as defined in NStack Types:
NStack Type | Python Type |
---|---|
Integer |
int |
Double |
float |
Boolean |
bool |
Text |
str |
() |
() (empty tuple) or None |
Tuple | tuple |
Struct | dict |
Array | list |
[Byte] |
bytes |
x optional |
None or x |
Json |
a json -encoded string * |
*Allows you to specify services that either take or receive Json-encoded strings as parameters.
R¶
Coming soon
Java¶
Coming soon
Reference¶
nstack CLI¶
The nstack CLI is used to build modules and workflows on a linked NStack server.
It can be configured via the nstack.conf
yaml
file found in ~\.config
on Linux/macOS, and in HOME/AppUser/Roaming
on Windows, or indirectly via the nstack set-server
command described below.
Usage¶
Having installed the CLI, make sure it’s accessible from your path
$ nstack --version
> nstack 0.0.3
You can find the list of commands and options available by running
$ nstack --help
Commands¶
This section explains the commands supported by the CLI toolkit.
register
¶
$ nstack register username email [-s SERVER]
Option | Description |
---|---|
username |
A unique username to assign on the the server. |
email |
An email address to validate the user and send login credentials to. |
SERVER |
The remote NStack Server to register with, by default this will use our demo server. |
A simple command to register with a remote NStack server so you can login, build modules, start workflows, etc.
Upon successful registration you will receive credentials over email that you can paste into the nstack
CLI and get started.
set-server
¶
$ nstack set-server server-addr server-port id key
Option | Description |
---|---|
server-addr |
URL of a remote NStack Server |
server-port |
Port of a remote NStack Server |
id |
Your unique id used to communicate with the remote NStack Server |
key |
Your secret key used to communicate with the remote NStack Server |
This command configures your local NStack CLI to communicate with a remote NStack Server with which you have registered (see previous command). You usually don’t have to enter this command by hand, it will be contained with an email after successful registration that you can paste directly into your terminal.
Internally this modifies the nstack.conf
CLI config file on your behalf (found in ~\.config
on Linux/macOS, and in HOME/AppUser/Roaming
on Windows).
info
¶
$ nstack info
Displays information regarding the entire current state of NStack, including:
- Modules
- Sources
- Sinks
- Running processes
- Base images
init
¶
$ nstack init [ --workflow | --framework <parent> | --language <language> ]
Option | Description |
---|---|
--workflow |
Initialise an NStack workflow (using the .nml NStack Workflow Language) |
-framework parent |
Initialise a module using the specified parent as a base |
--language language |
Initialise a module using the specified language, e.g. python |
Initialises a new nstack module in the current directory using the specified base language stack. This creates a working skeleton project which you can use to write your module.
If you are creating a module in an existing programming language, such as Python, init
creates a module with a single numChars
function already created. The initial project is comprised of the following files,
nstack.yaml
, your service’s configuration file (see module_structure),service.py
, an application file (or service.js, etc.), where your business-logic lives- an empty packages file (e.g.
requirements.txt
for Python, orpackage.json
for Node, etc.).
init
is the command used to create a new workflow. In this case, NStack creates a skeleton module.nml
file.
To build a new framework module that doesn’t inherit from a parent, run nstack init --language <language>
and delete the generated module.nml
.
build
¶
$ nstack build
Builds a module or project (see features-python) on your hosted nstack instance.
Note
build
is also used to build workflows. Remember, workflows are modules too!
start
¶
$ nstack start <module_name> <function_name>
Option | Description |
---|---|
module_name |
The nstack module which contains a fully-composed workflow function |
function_name |
The fully-composed workflow function name |
Used to start a workflow as a process. For example,
$ nstack start MyWorkflow:0.0.1 myWorkflow
notebook
¶
$ nstack notebook
Create an interactive session within the terminal that provides a mini-REPL (you can also redirect a file/stream into the notebook command to provide for rapid service testing and development).
From this command-line, you can import modules as needed, and enter a single workflow that will be compiled and run immediately on the server (press <Ctrl-D>
on Linux/macOS or <Ctrl-Z>
on Windows to submit your input).
$ nstack notebook
import Demo.Classify:0.0.3 as D;
Sources.http<Text> { http_path = "/classify" } | D.numChars | Sinks.log<Text>
<Ctrl-D>
> Service started successfully as process 5
send
¶
$ nstack send "route" 'data'
Option | Description |
---|---|
route |
The endpoint to send the data where a workflow is running. |
data |
A json snippet to send to the endpoint and pass into a workflow. |
Used with the HTTP source, nstack send
sends a JSON-encoded element to an endpoint on the NStack server where a workflow has been started. Useful for testing workflows that are to be used as web-hooks.
test
¶
$ nstack test <function> <input>
Option | Description |
---|---|
module |
A fully-qualified module name, e.g. Foo:0.0.1 |
function |
A function in the given module, e.g. numChars |
input |
A json snippet declaring test input to the function |
Used to test a function by sending it a single piece of input data, and logging the results. The function will be started, called with the test data to process, and then stopped.
Test is equivalent to starting the function with an HTTP source and log sink, using nstack send to send an input value, and then stopping the process.
It can be used for unit-testing a function and ensuring the module code works correctly.
The test data should be JSON-encoded, just as with nstack send. The test data must be of the type the function expects or it will be rejected by the nstack server.
$ nstack test Foo:0.0.1 numChars '"Hello World"'
$ nstack test Foo:0.0.1 sum '[1,2,3]'
ps
¶
$ nstack ps
Shows a list of all processes, which are workflows that are running on your your nstack server.
list
¶
$ nstack list <primitive>
Option | Description |
---|---|
<primitive> |
The primitive you want to list. |
Shows a list of available primitives. Support primitives are modules, workflows, functions, sources, and sinks.
delete
¶
$ nstack delete <module>
Option | Description |
---|---|
<module> |
The module’s name. |
Deletes a module (and thus its functions) from NStack.
log
¶
$ nstack log <process>
Option | Description |
---|---|
<process> |
The id of the process. |
View the logs of a running process.
Module Structure¶
Introduction¶
The following files are created when you create an nstack module using:
$ nstack init <stack>
module.nml¶
See Workflow Language.
nstack.yaml¶
The nstack.yaml
file describes the configuration of your nstack module. It has several fields that describe the project and let you control the packaging of your module.
Sample nstack.yaml
file:
# The language stack to use
stack:
language: python
api-version: 1
snapshot: [25, 0]
# (Optional) System-level packages needed
packages: []
# (Optional) Commands to run when building the module (Bash-compatible)
commands: []
# (Optional) Files/Dir to copy across into the module (can use regex/glob syntax)
files: []
stack
¶
Optional
The base language stack and version to use when creating an image. Currently we support:
Name | Description |
---|---|
python | Python 3.5 |
python2 | Python 2.7 |
- The
stack
is specified using the following three elements, as demonstrated in the sample: language
- the programming language to use, taken from the supported table aboveapi-version
- the version of the language support to use - this is used to ensure compatibility between the server and your functions. Theapi-version
changes only when there is a major change to the way code interacts with the server, and NStack will warn you if it detects a compatibility issuesnapshot
- the specific major and minor versions of the system packages repository to use. These are tied to Fedora Linux versions, e.g. 24, 25, 26, where the second number indicates a snapshot version of the upstream packages that is incremented every fortnight. These are fully reproducible, and it is recommended to keep them at their initial version
parent
¶
Optional
The base-image your module builds from. This is typically generated automatically from the stack
entry above, but can be explicitly specified to reference custom base-images that may include standardised packages (e.g. a pre-built scipy
stack)
Note
Either a stack
or parent
element is required to build an nstack module containing user-created code in a supported language
files
¶
Optional
A list of files and directories within the project directory to include and bundle in alongside the image.
packages
¶
Optional
A list of operating systems packages your module requires. These can be any packages installable via dnf
on RHEL or Fedora.
NStack Types¶
Primitive types¶
NStack supports the following primitive types
NStack Type | Notes |
---|---|
Integer |
A signed integer |
Double |
A 64-bit floating-point value |
Boolean |
A true or false value |
Text |
Unicode text |
Json |
Text containing with JSON-encoded content |
Complex types¶
More complex types can be built out of primitive ones:
NStack Type | Syntax | Notes |
---|---|---|
Optional types | T optional |
Optional value of type T |
Tuples | (A, B, ...) |
A tuple must have at least two fields |
Structs | { x: A, y: B, ... } |
|
Arrays: | [T] |
Use [Byte] to specify a byte-array, i.e. a Blob |
Void | Void |
Used to define custom sources and sinks, see Supported Integrations |
Unit | () |
Signifies an event which contains no data |
A user can make use of these types and define their own type in the Workflow Language. Look at Supported Languages to see how they can be used from Python, R, etc. in your own modules.
Sending untyped data¶
Most types can be built from combinations of primitive and complex types.
However, if you find your types are too complex, or change too often, you can use the Json
or [Byte]
types to send data between modules either as Json
or binary blobs.
By doing this nstack
won’t be able to ensure that the contracts in you workflow are correct and this disables automatically decoding/encoding data.
This is helpful when sending data such as Python pickled objects, when prototyping, or when you are in a pinch. However, we recommend creating proper types and contracts for your modules and workflows when possible.
Workflow Language¶
Overview¶
A module consists of:
- module header
- import statements
- type definitions
- external function declarations
- function definitions
in this order, for instance:
module ModuleB:0.1.0
import ModuleA:0.1.0 as A
type BType = A.Type
fun b : Text -> (BType, A.OtherType)
def x = A.y | b
All sections except the module header are optional.
Import Statements¶
An import statement includes the module to be imported (MyModule:0.1.0
)
and its alias (A
).
The alias is used to qualify types and functions imported from the module,
e.g. A.y
.
Type Definitions¶
Types are defined using the type
keyword:
type PlantInfo = { petalLength : Double
, petalWidth : Double
, sepalLength : Double
, sepalWidth : Double
}
type PlantSpecies = Text
The left-hand side of a type declaration is the new type name; the right-hand side must be an existing type.
A type defined in one module can be used in other module by prefixing it with the module alias:
module ModuleA:0.0.1
type AText = Text
module ModuleB:0.0.1
import ModuleA:0.0.1 as A
type B = (A.AText, A.AText)
Function Declarations¶
This section declares the types of functions that are backed by containers.
Functions are declared with the fun
keyword:
fun gotham : MovieRecordImage -> MovieRecordImage
Function Definitions¶
Definitions bind function names (x
) to expressions (A.y | b
).
They start with the def
keyword:
def z = filter x
If a name is not prefixed by a module alias, it refers to a function defined in the current module.
Expressions¶
Expressions combine already defined functions through the following operations:
Pipe¶
A.y | A.z
Every value produced by
A.y
is passed toA.z
.The output type of
A.y
must match the input type ofA.z
.
Concat¶
concat A.y
or A.y*
A.y
must be a function that produces lists of values, in which caseconcat A.y
is a function that “unpacks” the lists and yields the same values one by one.
Filter¶
filter A.y
or A.y?
A.y
must be a function that produces “optional” (potentially missing) values, in which casefilter A.y
is a function that filters out missing values.
Type application¶
Sources.A<T>
Some functions (notably, most sources and sinks) can be specialized
to multiple input or output types.
This is done with type application: Sources.http<Text>
specializes
Sources.http
to the type Text
.
Parameter application¶
A.y { one = "...", two = "..." }
.
Parameters are analogous to UNIX environment variables in the following ways:
Parameters are inherited. E.g. in
def y = x def z = y { foo = "bar" }
both functionsx
andy
will have access tofoo
whenz
is called.
Parameters can be overridden. E.g. in
def y = x { foo = "baz" } def z = y { foo = "bar" }
y
overrides the value offoo
that is passed tox
. Therefore,x
will see the value offoo
asbaz
, notbar
.
Parameters are used to configure sources and sinks — for instance, to specify how to connect to a PostgreSQL database.
Parameters can also be used to configure user-defined modules.
Inside a Python nstack method, the value of parameter foo
can be accessed as
self.args["foo"]
.
Comments¶
The workflow language supports line and block comments.
Line comments start with //
and extend until the end of line.
Block comments are enclosed in /*
and */
and cannot be nested.
EBNF grammar¶
The syntax is defined in EBNF (ISO/IEC 14977) in terms of tokens.
module = 'module', module name
, {import}
, {type}
, {declaration}
, {definition}
;
import = 'import', module name, 'as', module alias;
type = 'type', name, '=', ( type expression | sum type );
declaration = 'fun', name, ':', type expression,
'->', top-level type expression;
definition = 'def', name, '=', expression;
top-level type expression = type expression | 'Void';
type expression = type expression1
| 'optional', type expression 1
;
type expression1 = tuple
| struct
| array
| qualified name;
tuple = '(', ')'
| '(', type expression, ',', type expression,
{',', type expression}, ')';
struct = '{', name, ':', type expression,
{',', name, ':', type expression}, '}';
sum type = name, type expression1, '|', name, type expression1,
{'|', name, type expression1};
expression = expression1, {'|', expression1};
expression1 = application, '*'
| application, '?'
| 'concat', application
| 'filter', application
;
application = term [arguments];
arguments = '{', argument binding, {',', argument binding}, '}';
argument binding = name, '=', literal;
term = '(', expression, ')'
| qualified name ['<', type, '>']
;
Supported Integrations¶
NStack is built to integrate with existing infrastructure, event, and data-sources. Typically, this is by using them as sources and sinks in the NStack Workflow Language.
See also
Learn more about sources and sinks in Concepts
Sources¶
Schedule¶
Sources.schedule<()> {
cron = "* * * * * *"
}
NStack’s Schedule source allows you to run a workflow in intervals over a time period. It takes a single argument of a crontab, which specifies the interval to use.
Note that NStack’s scheduler expects six fields: minute, hour, day of month, month, day(s) of week, year. As the scheduler emits events, it is of type Unit, which is represented by ()
Postgres¶
Sources.postgres<Text> {
pg_host = "localhost", pg_port = "5432",
pg_user = "user", pg_password = "123456",
pg_database = "db", pg_query = "SELECT * FROM tbl;" }
pg_port
defaults to 5432, pg_user
defaults to postgres
, and
pg_password
defaults to the empty string. The other parameters are mandatory.
HTTP¶
Sources.http<Text> { http_path = "/foo" }
NStack’s HTTP source allows you to expose an NStack workflow as an HTTP endpoint with a very simple API.
The HTTP source must be configured with the http_path
,
a relative URL for the endpoint
on which it will listen for requests, e.g /foo
.
All HTTP source endpoints listen on port 8080
.
To call the endpoint, you need to send an HTTP request with the following properties:
Parameter | Value |
---|---|
verb | PUT or POST |
content-type | any allowed, but we suggest application/json |
The request should have a body
containing a single JSON object
with a single property called params
,
which contains a JSON encoded value
of the type expected by the Source.
e.g. { "params" : "Hello World" }
With curl
:
Using the command line utility curl
, you can easily run a single command to call the endpoint.
curl -X PUT -d '{ "params" : "Hello World" }' demo.nstack.com:8080/foo
With nstack send
:
The nstack
cli utility has a built-in send
command
for interacting with HTTP sources,
to use it just pass in the endpoint relative url,
and the JSON encoded value to send
(no need to specify the full params object).
nstack send "/foo" '"Hello World"'
Note
Note the double quoting on the value - the outer pair of (single) quotes are consumed by the shell, the inner quotes are part of the JSON representation for a string.
RabbitMQ (AMQP)¶
Sources.amqp<Text> {
amqp_host = "localhost", amqp_port = "5672",
amqp_vhost = "/", amqp_exchange = "ex",
amqp_key = "key"
}
amqp_port
defaults to 5672 and amqp_vhost
defaults to /
.
The other parameters are mandatory.
Stdin¶
Sources.stdin<Text>
Sources.stdin
has type Text
.
It does not take any arguments and does not require a type annotation,
but if the type annotation is present,
it must be Text
.
When Sources.stdin
is used as a process’s source,
you can connect to that process by running
nstack connect $PID
where $PID
is the process id
(as reported by nstack start
and nstack ps
).
After that,
every line fed to the standard input of nstack connect
will be passed to the process as a separate Text
value,
without the trailing newline.
To disconnect, simulate end-of-file by pressing Ctrl-D
on UNIX
or Ctrl-Z
on Windows.
BigQuery¶
A module which uploads data from BigQuery, downloads data from BigQuery, or runs an SQL query. See the Big Query Walkthrough for in-depth documentation.
Custom¶
You can define a custom source in Python by declaring a function of type
Void -> t
(where t
is any supported type except Void
)
and implementing this function in Python.
The return type of this function must be a generator that returns values of type t
.
Sinks¶
Postgres¶
Sinks.postgres<Text> {
pg_host = "localhost", pg_port = "5432",
pg_user = "user", pg_password = "123456",
pg_database = "db", pg_table = "tbl" }
Like for Postgres source,
pg_port
defaults to 5432, pg_user
defaults to postgres
, and
pg_password
defaults to the empty string. The other parameters are mandatory.
RabbitMQ (AMQP)¶
Sinks.amqp<Text> {
amqp_host = "localhost", amqp_port = "5672",
amqp_vhost = "/", amqp_exchange = "ex",
amqp_key = "key"
}
Like for AMQP source,
amqp_port
defaults to 5672 and amqp_vhost
defaults to /
.
The other parameters are mandatory.
AWS S3¶
An NStack sink for uploading files to S3 storage on Amazon Web Services
import AWS.S3:0.0.1-SNAPSHOT as S3
S3.upload { ...config... }
upload : {filepath: Text, data: [Byte]} -> Text
Uploads a file (represented as a sequence of bytes) to S3 with the given filepath, and returns a Text
indicating the item URL
.
The following configuration parameters are used for uploading to S3:
s3_key_id
- Your AWS Credentials KeyIds3_secret_key
- Your AWS Credentials secret keys3_bucket
- The S3 bucket to upload items into
Stdout¶
Sinks.stdout<Text>
Sinks.stdout
has type Text
.
It does not take any arguments and does not require a type annotation,
but if the type annotation is present,
it must be Text
.
When Sinks.stdout
is used as a process’s source,
you can connect to that process by running
nstack connect $PID
where $PID
is the process id
(as reported by nstack start
and nstack ps
).
After that,
every Text
value produced by the process
will be printed to the standard output by nstack connect
.
To disconnect, simulate end-of-file by pressing Ctrl-D
on UNIX
or Ctrl-Z
on Windows.
Custom¶
You can define a custom sink in Python by declaring a function of type
t -> Void
(where t
is any supported type except Void
)
and implementing this function in Python as usual.
The return type of this function will be ignored.
Conversions¶
JSON¶
Conv.from_json<(Integer,Boolean)>
Conv.to_json<(Integer,Boolean)>
These functions convert between nstack values and Text
values
containing JSON. They have types
Conv.from_json<type> : Text -> type
Conv.to_json<type> : type -> Text
Supported types are:
Integer
Double
Boolean
Text
[Byte]
- Arrays of supported types
- Tuples of supported types
- Structs of supported types
CSV¶
Conv.from_csv<(Integer,Boolean)>
Conv.to_csv<(Integer,Boolean)>
These functions convert between nstack values and Text
values
containing comma-separated fields. They have types
Conv.from_csv<type> : Text -> type
Conv.to_csv<type> : type -> Text
Supported field types are:
Integer
Double
Boolean
(encoded asTRUE
orFALSE
)Text
[Byte]
- Optional of another supported field type
Supported row types are:
- Arrays of supported field types
- Tuples of supported field types
- Structs of supported field types
If the row type is a struct, then the first emitted or consumed value is the CSV header. The column names in the header correspond to the field names of the struct.
If the row type is an array or a tuple, no header is expected or produced.
Text values produced by to_csv
are not newline-terminated.
Text values consumed by from_csv
may or may not be newline-terminated.
BigQuery Walkthrough¶
NStack offers Google Cloud BigQuery integration which you use by extending a built-in BigQuery module with your own data types, SQL, and credentials.
This walkthrough will show you how to use this module to upload data, download data, delete tables, and run queries.
Supported Operations¶
There are four interactions you can have with BigQuery, which are exposed as NStack functions
Function | Description |
---|---|
runQuery |
Execute an SQL query on BigQuery |
downloadData |
Download all the rows of data from a table as a single batch |
streamData |
Stream rows of data from a table with a configurable batch size |
uploadData |
Upload rows of data to a table |
dropTable |
Delete a table from BigQery |
How To¶
Init a new BigQuery Module¶
BigQuery exists as a Framework
Module within NStack.
Framework modules contain pre-built functions,
but require you to add your own files,
configuration,
and type signatures.
In this case, it is our credentials,
SQL files,
and the type signatures of the data we are uploading or downloading.
Note
Learn more about Framework Modules
To use it, we nstack init
a new module
and change it to use the BigQuery module as its parent.
> mkdir CustomerTable; cd CustomerTable
> nstack init --framework nstack/BigQuery:0.2.0
The framework
parameter to the init command here
sets the parent framework module to be BigQuery,
rather than the default Python image.
Add your credentials¶
To interact with BigQuery, the module must be able to authenticate with the BigQuery servers and therefore must have a copy of valid BigQuery credentials.
To add your credentials, you generate them from Google Auth in JSON format.
Note
See https://cloud.google.com/bigquery/authentication for details on how to generate json credentials
Then place them in a file in the module directory, e.g. credentials.json
.
{
"type": "service_account",
"project_id": "fake-project-462733",
"private_key_id": "5eb41c28aede90da40529221be4ac85f514134ba",
"private_key": "-----BEGIN PRIVATE KEY-----...private key here...-----END PRIVATE KEY-----\n",
"client_email": "my-user@fake-project-462733.iam.gserviceaccount.com",
"client_id": "981722325615647221019",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/tmg-user%40fake-project-462733.iam.gserviceaccount.com"
}
Then add this file to the files
list in the nstack.yaml
build file:
files:
- credentials.json
Doing this this will include the file in the module. When we use our BigQuery module in a workflow, we will tell the module to use this file by specifying it as a configuration parameter.
Add your SQL¶
Similarly, if we’re going to use runQuery
to execute SQL,
we include our SQL script in the module files list in the same way.
Note
If you are only using downloadData
, uploadData
or dropTable
, you do not need to include this file as you are not executing any SQL.
If your SQL query lives in example_query_job.sql
, copy that file into your module directory,
and add it to the files list (which already includes your credentials):
files:
- credentials.json
- example_query_job.sql
Define your types and declare your function¶
If you’re uploading data from NStack or downloading data into NStack, you declare the types of data your function will take or return. This is either the data you are going to be uploading, or the data you expect to download from the table. Either way, the type signature should match the Database Schema.
E.g. if you have a table with the following columns:
---------------------------------------------------------------------------------------
| CustomerName VarChar | CustomerAddress VarChar | CustomerId Int64 | CountryId Int64 |
---------------------------------------------------------------------------------------
Then you define a Customer
type in you module’s module.nml
as follows:
type Customer = {
name : Text,
address: Text,
id : Int,
countryId : Int
}
Note
The fields must be in the correct order to match the DB table. The names do not need to match, and if you misorder two or more fields - but the types still match - then you will get results containing the wrong fields
Once you have the type declared, you can then declare the BigQuery action you wish to take as an NStack function.
Create a module.nml
file and add in the boilerplate module CustomerTable:0.0.1-SNAPSHOT where
.
Next you must write a function definition for one or more of the
runQuery
, downloadData
or uploadData
functions that exist in the BigQuery parent image.
If downloading or uploading,
you declare them to use a list of the data type you just declared
as input or output.
For instance, to upload a list of customer records to a table:
uploadData : [Customer] -> ()
Download a table as a list of customer records:
downloadData : () -> [Customer]
Stream the list of customer records with a configurable batch size:
streamData : () -> [Customer]
Execute a single SQL query:
runQuery : () -> ()
Delete a table
dropTable : () -> ()
Build your module¶
Once the previous steps have been completed,
you can build your module as normal using nstack build
.
If you run nstack list functions
you should see your new functions listed there:
nstack/CustomerTable:0.0.1-SNAPSHOT
downloadData :: () -> [Customer]
Configure and Run¶
Now that your module is registered with the server, you can use the functions in workflows like any other function.
The BigQuery module takes a number of configuration parameters to allow you to configure it correctly for working with your particular BigQuery project
All BigQuery functions need the following configuration parameters supplied:
Configuration | Description |
---|---|
bq_credentials_file |
Path to the credentials file used to authenticate with BigQuery. |
bq_project |
Name of the BigQuery Project to use |
bq_dataset |
Name of the BigQuery Dataset in the above project to use |
The uploadData
, downloadData
, streamData
, and dropTable
functions also need the following parameter:
Configuration | Description |
---|---|
bq_table |
Name of the table to upload to, download from, or delete, respectively. |
The streamData
function needs the following parameter
Configuration | Description |
---|---|
bq_batch_size |
Batch size when streaming from table (1000-10000 recommended). |
The runQuery
function needs the following parameters
Configuration | Description |
---|---|
bq_query_file |
SQL query to execute. |
bq_query_dest |
Table to store the results of the sql query. |
The following parameters may be used when using runQuery
,
but are optional and can be ommitted if unneeded.
Configuration | Description |
---|---|
bq_maximum_billing_Tier |
Maximum billing tier if not default, must be an integer |
bq_use_legacy_sql |
Boolean flag to use legacy bigquery SQL format, rather than standard SQL. Should be “Yes”, “No”, “True” or “False” |
For instance, to expose a database uploader as an HTTP endpoint, you might do the following:
def upload = CustomerTable.uploadData {
bq_credentials_file = "credentials.json",
bq_project = "AcmeCorp",
bq_dataset = "AcmeCorpSales"
bq_table = "CustomerTable",
}
def workflow = Sources.http<[Customer]> { http_path = "/addCustomers" } | upload | Sinks.log<()>
Or to run a query on a given schedule:
def query = CustomerTable.runQuery {
bq_credentials_file = "credentials.json",
bq_project = "AcmeCorp",
bq_dataset = "AcmeCorpSales"
bq_query_file = "SalesQuery.sql",
bq_query_dst = "SalesAnalysisResults"
}
def workflow = Sources.schedule<()> { cron = "* * * * * *" } | query | Sinks.log<()>
Template Configuration¶
The BigQuery module supports using Jinja2 templates inside of its configuration parameters and in the SQL queries it executes.
This allows you to build more flexible functions that can cover a wider range of behaviors.
Note
For full details on Jinja2 templates, see http://jinja.pocoo.org/docs/2.9/templates/
The syntax you will use most is the standard expression template, which uses double curly braces:
prefix_{{ some.template.expression }}_suffix
Here the expression in curly braces will be evalated and replaced with its result.
The Jinja2 templates are evaluated in a sandbox for security reasons, so you do not have access to the full python standard library.
However, date and time functionality is exposed from the datetime
package
and can be accessed through the
date
, time
, datetime
and timedelta
variables.
E.g. to specify a target table for a query based on todays date, you can use
runQuery { bq_query_dest = "MyTablePrefix_{{ date.today().strftime('%Y%m%d') }}" }
On the 6th of July 2017, this would write to a table called MyTablePrefix_20170706
.
These value are evaluated every time the function processes a message, so if you keep the workflow running and send events to the function over multiple days you will write to a different table each time.
Note
For Python datetime formatting help, see: https://docs.python.org/2/library/datetime.html
In the SQL query itself, you have access to the same date and time functionality, including calculing offsets via timedelta.
E.g. to query last weeks table:
SELECT * FROM MyTablePrefix_{{ (date.today() - timedelta(days=7)).strftime('%Y%m%d') }} LIMIT 1000
In the SQL, you can also refer to the function configuration parameters
(as defined in your workflow DSL)
under a config
object.
E.g. to access a parameter named source_table
, you can write:
SELECT * FROM MyTablePrefix_{{ config.source_table }} LIMIT 1000
and then specify it in the DSL:
runQuery { source_table = "SomeTable" }
Note
You can add as many config parameters to a function as you like, even if they’re not normally used by the function