Welcome to SensorBee’s Documentation!¶
Contents:
Preface¶
This is the official documentation of SensorBee. It describes all the functionality that the current version of SensorBee officially supports.
This document is structured as follows:
- Preface, this part, provides general information of SensorBee.
- Part I is an introduction for new users through some tutorials.
- Part II documents the syntax and specification of the BQL language.
- Part III describes information for advanced users about extensibility capabilities of the server.
- Reference contains reference information about BQL statements, built-in components, and client programs.
What is SensorBee?¶
SensorBee is an open source, lightweight, stateful streaming data processing engine for the Internet of Things (IoT). SensorBee is designed to be used for streaming ETL (Extract/Transform/Load) at the edge of the network including Fog Computing. In ETL operations, SensorBee mainly focuses on data transformation and data enrichment, especially using machine learning. SensorBee is very small (stand-alone executable file size < 30MB) and runs on small computers such as Raspberry Pi.
The processing flow in SensorBee is written in BQL, a dialect of CQL (Continuous Query Language), which is similar to SQL but extended for streaming data processing. Its internal data structure (tuple) is compatible to JSON documents rather than rows in RDBMSs. Therefore, in addition to regular SQL expressions, BQL implements JSON notation and type conversions that work well with JSON. BQL is also schemaless at the moment to support rapid prototyping and integration.
Note
Supporting a schema in SensorBee is being planned to increase its robustness, debuggability, and speed. However, the version that will support the feature has not been decided yet.
SensorBee manages user-defined states (UDSs) and BQL utilizes those states to perform stateful processing on streaming data. An example of stateful processing is machine learning. Via a Python extension, SensorBee supports deep learning using Chainer, a flexible deep learning framework developed by Preferred Networks, Inc. and Preferred Infrastructure, Inc. The combination of SensorBee and Chainer enables users to support not only online analysis but also online training of deep learning models at the edge of the network with the help of GPUs. Preprocessing of data and feature extraction from preprocessed results can be written in BQL. The results can be computed in an online manner and directly connected to deep learning models implemented with Chainer.
By combining JSON-like data structure of BQL and machine learning, SensorBee becomes good at handling unstructured data such as text written in natural languages and even video streams, which are not well supported by most data processing engines. Therefore, SensorBee can operate, for example, between a video camera and Cloud-based (semi-structured) data analytics services so that those services don’t have to analyze raw video images and can only utilize the information extracted from them by SensorBee.
SensorBee can be extended to work with existing databases or data processing solutions by developing data source or sink plugins. For example, it officially provides plugins for fluentd, an open source data collector, and has various input and output plugins for major databases and Cloud services.
SensorBee has not been designed for:
- very large scale data processing
- massively parallel streaming data processing
- accurate numerical computation without any error
Conventions¶
The following conventions are used in the synopsis of a command:
Brackets (
[
and]
) indicate optional parts.- Some statements such as
SELECT
have[
and]
as a part of the statement. In that case, those brackets are enclosed with single quotes ('
).
- Some statements such as
Braces (
{
and}
) and vertical lines (|
) indicate that one of candidates in braces must be chosen (e.g. one of a, b, or c has to be selected{a | b | c}
).Dots (
...
) mean that the preceding element can be repeated.Commands that are to be run in a normal system shell are prefixed with a dollar sign (
$
).
Types and keywords in BQL are written with fixed-size fonts
.
Further Information¶
Besides this documentation, there’re other resources about SensorBee:
Website
http://sensorbee.io/ has general information about SensorBee.
Github
The sensorbee organization contains SensorBee’s core source code repository and its official plugins.
Mailing Lists
There are two Google Groups for discussion and questions about SensorBee: https://groups.google.com/forum/#!forum/sensorbee (English) and https://groups.google.com/forum/#!forum/sensorbee-ja (Japanese).
Tutorial¶
The following chapters gives an introduction to SensorBee.
Getting Started¶
To get started with SensorBee, this chapter introduces word counting as the first tutorial. It covers the following topics:
- How to install and set up SensorBee
- How to build a custom
sensorbee
command - How to use the
sensorbee
command - How to query the SensorBee server with
sensorbee shell
and BQL
Prerequisites¶
SensorBee requires Go 1.4 or later to be installed and its development
environment ($GOPATH
etc.) to be set up correctly. Also, Git needs
to be installed.
This tutorial assumes that readers know about basic Linux commands and basics of SQL.
SensorBee itself doesn’t have to be installed at this point.
Word Count Example¶
As the first tutorial, this section shows a word count example. All programs
and configuration files required for this tutorial are provided in the
wordcount
package of the Github repository
https://github.com/sensorbee/tutorial.
Installing Word Count Example Package¶
The first thing that needs to be done is to go get
the word count example
package in the repository:
$ go get github.com/sensorbee/tutorial/wordcount
This command clones the repository to
$GOPATH/src/github.com/sensorbee/tutorial/wordcount
and also downloads all
dependencies. In the config
subdirectory of that path, there are
configuration files for building and running SensorBee. After go get
successfully downloaded the package, copy those configuration files
to another temporary directory (replace /path/to/ with an appropriate path):
$ mkdir -p /path/to/wordcount
$ cp $GOPATH/src/github.com/sensorbee/tutorial/wordcount/config/* \
/path/to/wordcount/
$ ls /path/to/wordcount
build.yaml
sensorbee.yaml
wordcount.bql
Everything necessary to try this tutorial is ready now except SensorBee. The
next step is to build a custom sensorbee
command that includes the plugins
needed for this tutorial.
Building a sensorbee
Executable¶
To build a sensorbee
executable, the build_sensorbee
program needs to
be installed. To do so, issue the following command:
$ go get gopkg.in/sensorbee/sensorbee.v0/...
The build_sensorbee
program is used to build a custom sensorbee
executable with plugins provided by developers.
Then, move to the directory that has configuration files previously copied from
the tutorial package and execute build_sensorbee
:
$ cd /path/to/wordcount
/path/to/wordcount$ build_sensorbee
/path/to/wordcount$ ls
build.yaml
sensorbee
sensorbee.yaml
sensorbee_main.go
wordcount.bql
There are two new files in the directory: sensorbee
and
sensorbee_main.go
. Both of them are automatically generated by the
build_sensorbee
command. sensorbee
is the command to run the SensorBee
server or shell. Under the hood, this command is built from sensorbee_main.go
using go build
.
build_sensorbee
builds a sensorbee
command according to the configuration
in build.yaml
:
/path/to/wordcount$ cat build.yaml
plugins:
- github.com/sensorbee/tutorial/wordcount/plugin
Inserting a new go path to the plugin
section adds a new plugin to the
sensorbee
command, but this tutorial only uses the wordcount plugin above.
Other tutorials will cover this configuration file in more depth.
Run the Server¶
After building the sensorbee
command having plugins for this tutorial,
run it as a server:
/path/to/wordcount$ ./sensorbee run
INFO[0000] Setting up the server context config={"logging":
{"log_dropped_tuples":false,"min_log_level":"info","summarize_dropped_tuples":
false,"target":"stderr"},"network":{"listen_on":":15601"},"storage":{"uds":
{"params":{},"type":"in_memory"}},"topologies":{}}
INFO[0000] Starting the server on :15601
sensorbee run
runs the SensorBee server. It writes some log messages to
stdout but they can be ignored at the moment. It provides a HTTP JSON API and
listens on :15601
by default. However, the API isn’t directly used in this
tutorial. Instead of controlling the server via the API, this tutorial shows
how to use the sensorbee shell
command and the BQL language, which is similar
to SQL but has some extensions for streaming data.
To test if the server has successfully started, run the following command in another terminal:
$ curl http://localhost:15601/api/v1/runtime_status
{"gomaxprocs":1,"goroot":"/home/pfn/go","goversion":"go1.4.2",
"hostname":"sensorbee-tutorial","num_cgo_call":0,"num_cpu":4,
"num_goroutine":13,"pid":33267,"user":"pfn",
"working_directory":"/path/to/wordcount/"}
The server is correctly working if a response like this returned.
Setting Up a Topology¶
Once the server has started, open another window or use screen/tmux to have another terminal to interact with the server. The server does nothing just after it started up. There are a few steps required to enjoy interacting with stream data.
Firstly, to allow the server to process some data, it needs to have
a topology. A topology is a similar concept to a “database” in RDBMSs. It has
processing components such as data sources, continuous views, and so on.
Use the sensorbee topology create
command to create a new topology
wordcount
for the tutorial:
/path/to/wordcount$ ./sensorbee topology create wordcount
/path/to/wordcount$ echo $?
0
$?
(the return code of the ./sensorbee
command) will be 0
if
the command was successful. Otherwise, it will be non-zero.
Be careful to write ./sensorbee
(and not omit the ./
) in order to use
the executable from your current directory, which has the correct plugins baked in.
Note
Almost everything in SensorBee is volatile at the moment and is reset
every time the server restarts. A topology is dropped when the server shuts
down, too. Therefore, sensorbee topology create wordcount
needs to be
run on each startup of the server until it is specified in a config file for
sensorbee run
later.
In the next step, start sensorbee shell
:
/path/to/wordcount$ ./sensorbee shell -t wordcount
wordcount>
-t wordcount
means that the shell connects to the wordcount
topology
just created. Now it’s time to try some BQL statements. To start, try the EVAL
statement, which evaluates arbitrary expressions supported by BQL:
wordcount> EVAL 1 + 1;
2
wordcount> EVAL power(2.0, 2.5);
5.65685424949238
wordcount> EVAL "Hello" || ", world!";
"Hello, world!"
BQL also supports one line comments:
wordcount> -- This is a comment
wordcount>
Finally, create a source which generates stream data or reads input data from other stream data sources:
wordcount> CREATE SOURCE sentences TYPE wc_sentences;
wordcount>
This CREATE SOURCE
statement creates a source named sentences
. Its type
is wc_sentences
and it is provided by a plugin in the wordcount
package.
This source emits, on a regular basis, a random sentence having several words
with the name of a person who wrote a sentence. To receive data (i.e. tuples)
emitted from the source, use the SELECT
statement:
wordcount> SELECT RSTREAM * FROM sentences [RANGE 1 TUPLES];
{"name":"isabella","text":"dolor consequat ut in ad in"}
{"name":"sophia","text":"excepteur deserunt officia cillum lorem excepteur"}
{"name":"sophia","text":"exercitation ut sed aute ullamco aliquip"}
{"name":"jacob","text":"duis occaecat culpa dolor veniam elit"}
{"name":"isabella","text":"dolore laborum in consectetur amet ut nostrud ullamco"}
...
Type C-c
(also known as Ctrl+C
to some people) to stop the statement.
Details of the statement are not described for
now, but this is basically same as the SELECT
statement in SQL except two
things: RSTREAM
and RANGE
. Those concepts will briefly be explained in
the next section.
Querying: Basics¶
This subsection introduces basics of querying with BQL, i.e., the SELECT
statement.
Since it is very similar to SQL’s SELECT
and some basic familiarity with
SQL is assumed, two concepts that don’t exist in SQL are described first.
Then, some features that are also present in SQL will be covered.
Selection¶
The SELECT
statement can partially pick up some fields of input tuples:
wordcount> SELECT RSTREAM name FROM sentences [RANGE 1 TUPLES];
{"name":"isabella"}
{"name":"isabella"}
{"name":"jacob"}
{"name":"isabella"}
{"name":"jacob"}
...
In this example, only the name
field is picked up from input tuples that
have “name” and “text” fields.
BQL is schema-less at the moment and the format of output tuples emitted by a
source must be documented by that source’s author. The SELECT
statement is only able
to report an error at runtime when processing a tuple, not at the time when it is
sent to the server. This is a drawback of being schema-less.
Filtering¶
The SELECT
statement supports filtering with the WHERE
clause as SQL
does:
wordcount> SELECT RSTREAM * FROM sentences [RANGE 1 TUPLES] WHERE name = "sophia";
{"name":"sophia","text":"anim eu occaecat do est enim do ea mollit"}
{"name":"sophia","text":"cupidatat et mollit consectetur minim et ut deserunt"}
{"name":"sophia","text":"elit est laborum proident deserunt eu sed consectetur"}
{"name":"sophia","text":"mollit ullamco ut sunt sit in"}
{"name":"sophia","text":"enim proident cillum tempor esse occaecat exercitation"}
...
This filters out sentences from the user sophia
. Any expression which
results in a bool
value can be written in the WHERE
clause.
Grouping and Aggregates¶
The GROUP BY
clause is also available in BQL:
wordcount> SELECT ISTREAM name, count(*) FROM sentences [RANGE 60 SECONDS]
GROUP BY name;
{"count":1,"name":"isabella"}
{"count":1,"name":"emma"}
{"count":2,"name":"isabella"}
{"count":1,"name":"jacob"}
{"count":3,"name":"isabella"}
...
{"count":23,"name":"jacob"}
{"count":32,"name":"isabella"}
{"count":33,"name":"isabella"}
{"count":24,"name":"jacob"}
{"count":14,"name":"sophia"}
...
This statement creates groups of users in a 60 second-long window. It returns
pairs of a user and the number of sentences that have been written by that user
in the past 60 seconds. In addition to count
, BQL also provides built-in
aggregate functions such as min
, max
, and so on.
Also note that the statement above uses ISTREAM
rather than RSTREAM
. The
statement only reports a new count for an updated user while RSTREAM
reports
counts for all users every time it receives a tuple. Seeing the example of
outputs from the statements with RSTREAM
and ISTREAM
makes it easier to
understand their behaviors. When the statement receives isabella
, emma
,
isabella
, jacob
, and isabella
in this order, RSTREAM
reports
results as shown below (with some comments):
wordcount> SELECT RSTREAM name, count(*) FROM sentences [RANGE 60 SECONDS]
GROUP BY name;
-- receive "isabella"
{"count":1,"name":"isabella"}
-- receive "emma"
{"count":1,"name":"isabella"}
{"count":1,"name":"emma"}
-- receive "isabella"
{"count":2,"name":"isabella"}
{"count":1,"name":"emma"}
-- receive "jacob"
{"count":2,"name":"isabella"}
{"count":1,"name":"emma"}
{"count":1,"name":"jacob"}
-- receive "isabella"
{"count":3,"name":"isabella"}
{"count":1,"name":"emma"}
{"count":1,"name":"jacob"}
On the other hand, ISTREAM
only emits tuples updated in the current
resulting relation:
wordcount> SELECT ISTREAM name, count(*) FROM sentences [RANGE 60 SECONDS]
GROUP BY name;
-- receive "isabella"
{"count":1,"name":"isabella"}
-- receive "emma", the count of "isabella" isn't updated
{"count":1,"name":"emma"}
-- receive "isabella"
{"count":2,"name":"isabella"}
-- receive "jacob"
{"count":1,"name":"jacob"}
-- receive "isabella"
{"count":3,"name":"isabella"}
This is one typical situation where ISTREAM
works well.
Tokenizing Sentences¶
To perform word counting, sentences that are contained in sources
need to be
split up into words. Imagine there was a user-defined function (UDF)
tokenize(sentence)
returning an array of strings:
SELECT RSTREAM name, tokenize(text) AS words FROM sentences ...
A resulting tuple of this statement would look like:
{
"name": "emma",
"words": ["exercitation", "ut", "sed", "aute", "ullamco", "aliquip"]
}
However, to count words with the GROUP BY
clause and the count
function,
the tuple above further needs to be split into multiple tuples so that each tuple has
one word instead of an array:
{"name": "emma", "word": "exercitation"}
{"name": "emma", "word": "ut"}
{"name": "emma", "word": "sed"}
{"name": "emma", "word": "aute"}
{"name": "emma", "word": "ullamco"}
{"name": "emma", "word": "aliquip"}
With such a stream, the statement below could easily compute the count of each word:
SELECT ISTREAM word, count(*) FROM some_stream [RANGE 60 SECONDS]
GROUP BY word;
To create a stream like this from tuples emitted from sentences
, BQL
has the concept of a user-defined stream-generating function (UDSF). A UDSF is able
to emit multiple tuples from one input tuple, something that cannot be done with the
SELECT
statement itself. The wordcount
package from this tutorial provides
a UDSF wc_tokenizer(stream, field)
, where stream
is the name of the input
stream and field
is the name of the field containing a sentence to be
tokenized. Both arguments need to be string values.
wordcount> SELECT RSTREAM * FROM wc_tokenizer("sentences", "text") [RANGE 1 TUPLES];
{"name":"ethan","text":"duis"}
{"name":"ethan","text":"lorem"}
{"name":"ethan","text":"adipiscing"}
{"name":"ethan","text":"velit"}
{"name":"ethan","text":"dolor"}
...
In this example, wc_tokenizer
receives tuples from the sentences
stream
and tokenizes sentences contained in the text
field of input tuples. Then,
it emits each tokenized word as a separate tuple.
Note
As shown above, a UDSF is one of the most powerful tools to extend BQL’s capability. It can virtually do anything that can be done for stream data. To learn how to develop it, see User-Defined Stream-Generating Functions.
Creating a Stream¶
Although it is now possible to count tokenized words, it is easier to have something like
a “view” in SQL to avoid writing wc_tokenizer("sentences", "text")
every time
issuing a new query. BQL has a stream (a.k.a a continuous view), which
just works like a view in RDBMSs. A stream can be created using the
CREATE STREAM
statement:
wordcount> CREATE STREAM words AS
SELECT RSTREAM name, text AS word
FROM wc_tokenizer("sentences", "text") [RANGE 1 TUPLES];
wordcount>
This statement creates a new stream called words
. The stream renames
text
field to word
. The stream can be referred by the FROM
clause
of the SELECT
statement as follows:
wordcount> SELECT RSTREAM * FROM words [RANGE 1 TUPLES];
{"name":"isabella","word":"pariatur"}
{"name":"isabella","word":"adipiscing"}
{"name":"isabella","word":"id"}
{"name":"isabella","word":"et"}
{"name":"isabella","word":"aute"}
...
A stream can be specified in the FROM
clause of multiple SELECT
statements and all those statements will receive the same tuples from
the stream.
Counting Words¶
After creating the words
stream, words can be counted as follows:
wordcount> SELECT ISTREAM word, count(*) FROM words [RANGE 60 SECONDS]
GROUP BY word;
{"count":1,"word":"aute"}
{"count":1,"word":"eu"}
{"count":1,"word":"quis"}
{"count":1,"word":"adipiscing"}
{"count":1,"word":"ut"}
...
{"count":47,"word":"mollit"}
{"count":35,"word":"tempor"}
{"count":100,"word":"in"}
{"count":38,"word":"sint"}
{"count":79,"word":"dolor"}
...
This statement counts the number of occurrences of each word that appeared in the past 60
seconds. By creating another stream based on the SELECT
statement above,
further statistical information can be obtained:
wordcount> CREATE STREAM word_counts AS
SELECT ISTREAM word, count(*) FROM words [RANGE 60 SECONDS]
GROUP BY word;
wordcount> SELECT RSTREAM max(count), min(count)
FROM word_counts [RANGE 60 SECONDS];
{"max":52,"min":52}
{"max":120,"min":52}
{"max":120,"min":50}
{"max":165,"min":50}
{"max":165,"min":45}
...
{"max":204,"min":31}
{"max":204,"min":30}
{"max":204,"min":29}
{"max":204,"min":28}
{"max":204,"min":27}
...
The CREATE STREAM
statement above creates a new stream word_counts
. The
next SELECT
statement computes the maximum and minimum counts over words
observed in past 60 seconds.
Using a BQL File¶
All statements above will be cleared once the SensorBee server is restarted. By using a BQL file, a topology can be set up on each startup of the server. A BQL file can contain multiple BQL statements. For the statements used in this tutorial, the file would look as follows:
CREATE SOURCE sentences TYPE wc_sentences;
CREATE STREAM words AS
SELECT RSTREAM name, text AS word
FROM wc_tokenizer("sentences", "text") [RANGE 1 TUPLES];
CREATE STREAM word_counts AS
SELECT ISTREAM word, count(*)
FROM words [RANGE 60 SECONDS]
GROUP BY word;
Note
A BQL file cannot have the SELECT
statement because it runs
continuously until it is manually stopped.
To run the BQL file on the server, a configuration file for sensorbee run
needs to be provided in YAML format. The name of the configuration file is often
sensorbee.yaml
. For this tutorial, the file has the following content:
topologies:
wordcount:
bql_file: wordcount.bql
topologies
is one of the top-level parameters related to topologies in
the server. It has names of topologies to be created on startup. In the file
above, there’s only one topology wordcount
. Each topology has a bql_file
parameter to specify which BQL file to execute. The wordcount.bql
file was
copied to the current directory before and the configuration file above specifies it.
With this configuration file, the SensorBee server can be started as follows:
/path/to/wordcount$ ./sensorbee run -c sensorbee.yaml
./sensorbee run -c sensorbee.yaml
INFO[0000] Setting up the server context config={"logging":
{"log_dropped_tuples":false,"min_log_level":"info","summarize_dropped_tuples":
false,"target":"stderr"},"network":{"listen_on":":15601"},"storage":{"uds":
{"params":{},"type":"in_memory"}},"topologies":{"wordcount":{"bql_file":
"wordcount.bql"}}}
INFO[0000] Setting up the topology topology=wordcount
INFO[0000] Starting the server on :15601
As written in log messages, the topology wordcount
is created before
the server actually starts.
Summary¶
This tutorial provided a brief overview of SensorBee through word counting.
First of all, it showed how to build a custom sensorbee
command to work with
the tutorial. Second, running the server and setting up a topology with
BQL was explained. Then, querying streams and how to create a new stream
using SELECT
was introduced. Finally, word counting was performed over a
newly created stream and BQL statements that create a source and streams were
persisted in a BQL file so that the server can re-execute those statements on
startup.
In subsequent sections, there are more tutorials and samples to learn how to integrate SensorBee with other tools and libraries.
Using Machine Learning¶
This chapter describes how to use machine learning on SensorBee.
In this tutorial, SensorBee retrieves tweets written in English from Twitter’s public stream using Twitter’s Sample API. SensorBee adds two labels to each tweet: age and gender. Tweets are labeled (“classified”) using machine learning.
The following sections shows how to install dependencies, set them up, and apply machine learning to tweets using SensorBee.
Note
Due to the way the Twitter client receives tweets from Twitter, the behavior of this tutorial demonstration does not seem very smooth. For example, the client gets around 50 tweets in 100ms, then stops for 900ms, and repeats the same behavior every second. So, it is easy to get the misperception that SensorBee and its machine learning library are doing mini-batch processing, but they actually do not.
Prerequisites¶
This tutorial requires following software to be installed:
Ruby 2.1.4 or later
Elasticsearch 2.2.0 or later
To check that the data arrives properly in Elasticsearch and to show see this data could be visualized, also install:
Kibana 4.4.0 or later
However, explanations on how to configure and use Kibana for data visualization are not part of this tutorial. It is assumed that Elasticsearch and Kibana are running on the same host as SensorBee. However, SensorBee can be configured to use services running on a different host.
In addition, Go 1.4 or later and Git are required, as described in Getting Started.
Quick Set Up Guide¶
If there are no Elasticsearch and Kibana instances that can be used for this tutorial, they need to be installed. Skip this subsection if they are already installed. In case an error occurs, look up the documentation at http://www.elastic.co/.
Installing and Running Elasticsearch¶
Download the package from https://www.elastic.co/downloads/elasticsearch
and extract the compressed file. Then, run bin/elasticsearch
in the
directory with the extracted files:
/path/to/elasticserach-2.2.0$ bin/elasticsearch
... log messages ...
Note that a Java runtime is required to run the command above.
To see if Elasticsearch is running, access the server with curl
command:
$ curl http://localhost:9200/
{
"name" : "Peregrine",
"cluster_name" : "elasticsearch",
"version" : {
"number" : "2.2.0",
"build_hash" : "8ff36d139e16f8720f2947ef62c8167a888992fe",
"build_timestamp" : "2016-01-27T13:32:39Z",
"build_snapshot" : false,
"lucene_version" : "5.4.1"
},
"tagline" : "You Know, for Search"
}
Installing and Running Kibana¶
Download the package from https://www.elastic.co/downloads/kibana and
extract the compressed file. Then, run bin/kibana
in the directory
with the extracted files:
/path/to/kibana-4.4.0$ bin/kibana
... log messages ...
Access http://localhost:5601/ with a Web browser. Kibana is running correctly if it shows a page saying “Configure an index pattern”. Since Elasticsearch does not have any data yet, no more operation is necessary at the moment. In the Running SensorBee section further configuration steps are described.
Installation and Setup¶
At this point, the environment described in the previous section is assumed to be installed correctly and working. Now, some more components needs to be set up before continuing this tutorial.
Installing the Tutorial Package¶
To setting up the system, go get
the tutorial package first:
$ go get github.com/sensorbee/tutorial/ml
The package contains configuration files in the config
subdirectory
that are necessary for the tutorial. Create a temporary directory and copy those
files to the directory (replace /path/to/ with an appropriate path):
$ mkdir -p /path/to/sbml
$ cp -r $GOPATH/src/github.com/sensorbee/tutorial/ml/config/* /path/to/sbml/
$ cd /path/to/sbml
/path/to/sbml$ ls
Gemfile
build.yaml
fluent.conf
sensorbee.yaml
train.bql
twitter.bql
uds
Installing and Running fluentd¶
This tutorial, and SensorBee, relies on fluentd. fluentd is an open source data collector that provides many input and output plugins to connect with a wide variety of databases including Elasticsearch. Skip this subsection if fluentd is already installed.
To install fluentd for this tutorial, bundler needs to be installed with
the gem
command. To see if it’s already installed, run gem list
.
Something like bundler (1.11.2)
shows up if it’s already installed:
/path/to/sbml$ gem list | grep bundler
bundler (1.11.2)
/path/to/sbml$
Otherwise, install bundler with gem install bundler
. It may require admin
privileges (i.e. sudo
):
/path/to/sbml$ gem install bundler
Fetching: bundler-1.11.2.gem (100%)
Successfully installed bundler-1.11.2
Parsing documentation for bundler-1.11.2
Installing ri documentation for bundler-1.11.2
Done installing documentation for bundler after 3 seconds
1 gem installed
/path/to/sbml$
After installing bundler, run the following command to install fluentd and its
plugins under the /path/to/sbml
directory (in order to build the gems, you
may have to install Ruby header files before):
/path/to/sbml$ bundle install --path vendor/bundle
Fetching gem metadata from https://rubygems.org/............
Fetching version metadata from https://rubygems.org/..
Resolving dependencies...
Installing cool.io 1.4.3 with native extensions
Installing multi_json 1.11.2
Installing multipart-post 2.0.0
Installing excon 0.45.4
Installing http_parser.rb 0.6.0 with native extensions
Installing json 1.8.3 with native extensions
Installing msgpack 0.5.12 with native extensions
Installing sigdump 0.2.4
Installing string-scrub 0.0.5 with native extensions
Installing thread_safe 0.3.5
Installing yajl-ruby 1.2.1 with native extensions
Using bundler 1.11.2
Installing elasticsearch-api 1.0.15
Installing faraday 0.9.2
Installing tzinfo 1.2.2
Installing elasticsearch-transport 1.0.15
Installing tzinfo-data 1.2016.1
Installing elasticsearch 1.0.15
Installing fluentd 0.12.20
Installing fluent-plugin-elasticsearch 1.3.0
Bundle complete! 2 Gemfile dependencies, 20 gems now installed.
Bundled gems are installed into ./vendor/bundle.
/path/to/sbml$
With --path vendor/bundle
option, all Ruby gems required for this tutorial
is locally installed in the /path/to/sbml/vendor/bundle
directory. To
confirm whether fluentd is correctly installed, run the command below:
/path/to/sbml$ bundle exec fluentd --version
fluentd 0.12.20
/path/to/sbml$
If it prints the version, the installation is complete and fluentd is ready to be used.
Once fluentd is installed, run it with the provided configuration file:
/path/to/sbml$ bundle exec fluentd -c fluent.conf
2016-02-05 16:02:10 -0800 [info]: reading config file path="fluent.conf"
2016-02-05 16:02:10 -0800 [info]: starting fluentd-0.12.20
2016-02-05 16:02:10 -0800 [info]: gem 'fluentd' version '0.12.20'
2016-02-05 16:02:10 -0800 [info]: gem 'fluent-plugin-elasticsearch' version '1.3.0'
2016-02-05 16:02:10 -0800 [info]: adding match pattern="sensorbee.tweets" type="...
2016-02-05 16:02:10 -0800 [info]: adding source type="forward"
2016-02-05 16:02:10 -0800 [info]: using configuration file: <ROOT>
<source>
@type forward
@id forward_input
</source>
<match sensorbee.tweets>
@type elasticsearch
host localhost
port 9200
include_tag_key true
tag_key @log_name
logstash_format true
flush_interval 1s
</match>
</ROOT>
2016-02-05 16:02:10 -0800 [info]: listening fluent socket on 0.0.0.0:24224
Some log messages are truncated with ...
at the end of each line.
The configuration file fluent.conf
is provided as a part of this tutorial.
It defines a data source using in_forward
and a destination that
is connected to Elasticsearch. If the Elasticserver is running on a different
host or using a port number different from 9200, edit fluent.conf
:
<source>
@type forward
@id forward_input
</source>
<match sensorbee.tweets>
@type elasticsearch
host {custom host name}
port {custom port number}
include_tag_key true
tag_key @log_name
logstash_format true
flush_interval 1s
</match>
Also, feel free to change other parameters to adjust the configuration to the actual environment. Parameters for the Elasticsearch plugin are described at https://github.com/uken/fluent-plugin-elasticsearch.
Create Twitter API Key¶
This tutorial requires Twitter’s API keys. To create keys, visit Application Management. Once a new application is created, click the application and its “Keys and Access Tokens” tab. The page should show 4 keys:
- Consumer Key (API Key)
- Consumer Secret (API Secret)
- Access Token
- Access Token Secret
Then, create the api_key.yaml
in the /path/to/sbml
directory and copy
keys to the file as follows:
/path/to/sbml$ cat api_key.yaml
consumer_key: <Consumer Key (API Key)>
consumer_secret: <Consumer Secret (API Secret)>
access_token: <Access Token>
access_token_secret: <Access Token Secret>
Replace each key’s value with the actual values shown in Twitter’s application management page.
Running SensorBee¶
All requirements for this tutorial have been installed and set up. The next
step is to install build_sensorbee
, then build and run the sensorbee
executable:
/path/to/sbml$ go get gopkg.in/sensorbee/sensorbee.v0/...
/path/to/sbml$ build_sensorbee
sensorbee_main.go
/path/to/sbml$ ./sensorbee run -c sensorbee.yaml
INFO[0000] Setting up the server context config={"logging":
{"log_dropped_tuples":false,"min_log_level":"info","summarize_dropped_tuples":
false,"target":"stderr"},"network":{"listen_on":":15601"},"storage":{"uds":
{"params":{"dir":"uds"},"type":"fs"}},"topologies":{"twitter":{"bql_file":
"twitter.bql"}}}
INFO[0000] Setting up the topology topology=twitter
INFO[0000] Starting the server on :15601
Because SensorBee loads pre-trained machine learning models on its startup,
it may take a while to set up a topology. After the server shows the
message Starting the server on :15601
, access Kibana at
http://localhost:5601/. If the setup operations performed so far have been
successful, it returns the page as shown below with a green “Create” button:

(If the button is not visible, see the section on Troubleshooting below.) Click the “Create” button to work with data coming from SensorBee. After the action is completed, you should see a list of fields that were found in the data stored so far. If you click “Discover” in the top menu, a selection of the tweets and a diagram with the tweet frequency should be visible.
Kibana can now be used to visualize and search through the data in Elasticsearch. Although this tutorial doesn’t describe the usage of Kibana, many tutorials and examples can be found on the Web. The picture below shows an example chart showing some classification metrics:

Troubleshooting¶
If Kibana doesn’t show the “Create” button, something may not be working
properly. First, enter sensorbee shell
to see SensorBee is working:
/path/to/sbml$ ./sensorbee shell -t twitter
twitter>
Then, issue the following SELECT
statement:
twitter> SELECT RSTREAM * FROM public_tweets [RANGE 1 TUPLES];
... tweets show up here ...
If the statement returns an error or it doesn’t show any tweet:
- the host may not be connected to Twitter. Check the internet connection with
commands such as
ping
. - The API key written in
api_key.yaml
may be wrong.
When the statement above shows tweets, query another stream:
twitter> SELECT RSTREAM * FROM labeled_tweets [RANGE 1 TUPLES];
... tweets show up here ...
If the statement doesn’t show any tweets, the format of tweets may have been
changed since the time of this writing. If so, modify BQL statements in
twitter.bql
to support the new format. BQL Statements and Plugins
describes what each statement does.
When the statement above prints tweets, fluentd or Elasticsearch may have not been started yet. Check they’re running correctly.
For other errors, report them to https://github.com/sensorbee/tutorial.
BQL Statements and Plugins¶
This section describes how SensorBee produced the output that was
seen in the previous section: How it loads tweets from Twitter, preprocesses
tweets for machine learning, and finally classifies tweets to extract
demographic information of each tweets. twitter.bql
in the config
directory contains all BQL statements used in this tutorial.
The following subsections explains what each statement does. To interact with some
streams created by twitter.bql
, open another terminal (while the sensorbee
instance from the previous section is still running) and launch
sensorbee shell
:
/path/to/sbml$ ./sensorbee shell -t twitter
twitter>
In the following sections of this tutorial, statements prefixed with
twitter>
can be executed in the SensorBee shell; statements without this prefix
are statements from the twitter.bql
file.
Creating a Twitter Source¶
This tutorial does not work without retrieving the public timeline of Twitter using the Sample API. The Sample API is provided for free to retrieve a portion of tweets sampled from the public timeline.
The github.com/sensorbee/twitter
package provides a plugin for public time line retrieval. The source provided by that plugin has the type
twitter_public_stream
. The plugin can be registered to the SensorBee
server by adding github.com/sensorbee/twitter/plugin
to the build.yaml
configuration file for build_sensorbee
. Now consider the first statement
in the twitter.bql
file:
CREATE SOURCE public_tweets TYPE twitter_public_stream
WITH key_file = "api_key.yaml";
This statement creates a new source with the name public_tweets
. To retrieve raw
tweets from that source, run the following SELECT
statement in the
SensorBee shell:
twitter> SELECT RSTREAM * FROM public_tweets [RANGE 1 TUPLES];
Note
For simplicity, a relative path is specified as the key_file
parameter.
However, it is usually recommended to pass an absolute path when
running the SensorBee server as a daemon.
Preprocessing Tweets and Extracting Features for Machine Learning¶
Before applying machine learning to tweets, they need to be converted into another form of information so that machine learning algorithms can utilize them. The conversion consists of two tasks: preprocessing and feature extraction. Preprocessing generally involves data cleansing, filtering, normalization, and so on. Feature extraction transforms preprocessed data into several pieces of information (i.e. features) that machine learning algorithms can “understand”.
Which preprocessing or feature extraction methods are required for machine learning varies depending on the format or data type of input data or machine learning algorithms to be used. Therefore, this tutorial only shows one example of applying a classification algorithm to English tweets.
Selecting Meaningful Fields of English Tweets¶
Because this tutorial aims at English tweets, tweets written in other
languages needs to be removed. This can be done with the WHERE
clause, as you can check in the SensorBee shell:
twitter> SELECT RSTREAM * FROM public_tweets [RANGE 1 TUPLES]
WHERE lang = "en";
Tweets have the lang
field and it can be used for the filtering.
In addition to it, not all fields in a raw tweet will be required for machine learning. Thus, removing unnecessary fields keeps data simple and clean:
CREATE STREAM en_tweets AS
SELECT RSTREAM
"sensorbee.tweets" AS tag, id_str AS id, lang, text,
user.screen_name AS screen_name, user.description AS description
FROM public_tweets [RANGE 1 TUPLES]
WHERE lang = "en";
This statement creates a new stream en_tweets
. It only selects English
tweets by WHERE lang = "en"
. "sensorbee.tweets" AS tag
is used by
fluentd
sink later. The items in that stream will look like:
{
"tag": "sensorbee.tweets",
"id": "the string representation of tweet's id",
"lang": "en",
"text": "the contents of the tweet",
"screen_name": "user's @screen_name",
"description": "user's profile description"
}
Note
AS
in user.screen_name AS screen_name
is required at the moment.
Without it, the field would have the name like col_n
. This is because
user.screen_name
could be evaluated as a JSON Path and might result in
multiple return values so that it cannot properly be named. This
specification might be going to be changed in the future version.
Removing Noise¶
Noise that is meaningless and could be harmful to machine learning algorithms needs to be removed. The field of natural language processing (NLP) has developed many methods for this purpose and they can be found in a wide variety of articles. However, this tutorial only applies some of the most basic operations on each tweets.
CREATE STREAM preprocessed_tweets AS
SELECT RSTREAM
filter_stop_words(
nlp_split(
nlp_to_lower(filter_punctuation_marks(text)),
" ")) AS text_vector,
filter_stop_words(
nlp_split(
nlp_to_lower(filter_punctuation_marks(description)),
" ")) AS description_vector,
*
FROM en_tweets [RANGE 1 TUPLES];
The statement above creates a new stream preprocessed_tweets
from
en_tweets
. It adds two fields to the tuple emitted from en_tweets
:
text_vector
and description_vector
. As for preprocessing, the
statement applies following methods to text
and description
fields:
- Remove punctuation marks
- Change uppercase letters to lowercase
- Remove stopwords
First of all, punctuation marks are removed by the user-defined function (UDF)
filter_puncuation_marks
. It is provided in a plugin for this tutorial in the
github.com/sensorbee/tutorial/ml
package. The UDF removes some punctuation
marks such as ”,”, ”.”, or “()” from a string.
Note
Emoticons such as ”:)” may play a very important role in classification
tasks like sentiment estimation. However, filter_punctuation_marks
simply removes most of them for simplicity. Develop a better UDF to solve
this issue as an exercise.
Second, all uppercase letters are converted into lowercase letters by
the nlp_to_lower
UDF. The UDF is registered in
github.com/sensorbee/nlp/plugin
. Because a letter is mere byte code and
the values of “a” and “A” are different, machine learning algorithms consider
“word” and “Word” have different meanings. To avoid that confusion, all letters
should be “normalized”.
Note
Of course, some words should be distinguished by explicitly starting with an uppercase. For example, “Mike” could be a name of a person, but changing it to “mike” could make the word vague.
Finally, all stopwords are removed. Stopwords are words that appear too often
and don’t provide any insight for classification. Stopword filtering in this
tutorial is done in two steps: tokenization and filtering. To perform a
dictionary-based stopword filtering, the content of a tweet needs to be
tokenized. Tokenization is a process that converts a sentence into a sequence
of words. In English, “I like sushi” will be tokenized as
["I", "like", "sushi"]
. Although tokenization isn’t as simple as just
splitting words by white spaces, the preprocessed_tweets
stream simply
does it for simplicity using the UDF nlp_split
, which is defined in the
github.com/sensorbee/nlp
package. nlp_split
takes two arguments: a
sentence and a splitter. In the statement, contents are split by a white
space. nlp_split
returns an array of strings. Then, the UDF
filter_stop_words
takes the return value of nlp_split
and removes
stopwords contained in the array. filter_stop_word
is provided as a part
of this tutorial in the github.com/sensorbee/tutorial/ml
package. It’s a mere
example UDF and doesn’t provide perfect stopword filtering.
As a result, both text_vector
and description_vector
have an array
of words like ["i", "want", "eat", "sushi"]
created from the sentence
I want to eat sushi.
.
Preprocessing shown so far is very similar to the preprocessing required for full-text search engines. There should be many valuable resources among that field including Elasticsearch.
Note
For other preprocessing approaches such as stemming, refer to natural language processing textbooks.
Creating Features¶
In NLP, a bag-of-words representation is usually used as a feature for machine learning algorithms. A bag-of-words consists of pairs of a word and its weight. Weight could be any numerical value and usually something related to term frequency (TF) is used. A sequence of the pairs is called a feature vector.
A feature vector can be expressed as an array of weights. Each word in all tweets observed by a machine learning algorithm corresponds to a particular position of the array. For example, the weight of the word “want” may be 4th element of the array.
A feature vector for NLP data could be very long because tweets contains many words. However, each vector would be sparse due to the maximum length of tweets. Even if machine learning algorithms observe more than 100,000 words and use them as features, each tweet only contains around 30 or 40 words. Therefore, each feature vector is very sparse, that is, only a small number its elements have non-zero weight. In such cases, a feature vector can effectively expressed as a map:
{
"word": weight,
"word": weight,
...
}
This tutorial uses online classification algorithms that are imported from Jubatus, a distributed online machine learning server. These algorithms accept the following form of data as a feature vector:
{
"word1": 1,
"key1": {
"word2": 2,
"word3": 1.5,
},
"word4": [1.1, 1.2, 1.3]
}
The SensorBee terminology for that kind of data structure is “map”. A map can be nested and its value can be an array containing weights. The map above is converted to something like:
{
"word1": 1,
"key1/word2": 2,
"key1/word3": 1.5,
"word4[0]": 1.1,
"word4[1]": 1.2,
"word4[2]": 1.3
}
The actual feature vectors for the tutorial are created in the fv_tweets
stream:
CREATE STREAM fv_tweets AS
SELECT RSTREAM
{
"text": nlp_weight_tf(text_vector),
"description": nlp_weight_tf(description_vector)
} AS feature_vector,
tag, id, screen_name, lang, text, description
FROM preprocessed_tweets [RANGE 1 TUPLES];
As described earler, text_vector
and description_vector
are arrays of
words. The nlp_weight_tf
function defined in the github.com/sensorbee/nlp
package computes a feature vector from an array. The weight is term
frequency (i.e. the number of occurrences of a word). The result is a map
expressing a sparse vector above. To see how the feature_vector
looks
like, just issue a SELECT
statement for the fv_tweets
stream.
All required preprocessing and feature extraction have been completed and it’s now ready to apply machine learning to tweets.
Applying Machine Learning¶
The fv_tweets
stream now has all the information required by a machine
learning algorithm to classify tweets. To apply the algorithm for each tweets,
pre-trained machine learning models have to be loaded:
LOAD STATE age_model TYPE jubaclassifier_arow
OR CREATE IF NOT SAVED
WITH label_field = "age", regularization_weight = 0.001;
LOAD STATE gender_model TYPE jubaclassifier_arow
OR CREATE IF NOT SAVED
WITH label_field = "gender", regularization_weight = 0.001;
In SensorBee, machine learning models are expressed as user-defined states
(UDSs). In the statement above, two models are loaded: age_model
and
gender_model
. These models contain the necessary information to classify gender and
age of the user of each tweet. The model files are located in the uds
directory
that was copied from the package’s config
directory beforehand:
/path/to/sbml$ ls uds
twitter-age_model-default.state
twitter-gender_model-default.state
These filenames were automatically assigned by SensorBee server when the
SAVE STATE
statement was issued. It will be described later.
Both models have the type jubaclassifier_arow
imported from
Jubatus. The UDS type is implemented in the
github.com/sensorbee/jubatus/classifier
package. jubaclassifier_arow
implements the AROW online linear classification
algorithm [Crammer09]. Parameters specified in the WITH
clause are related
to training and will be described later.
After loading the models as UDSs, the machine learning algorithm is ready to work:
CREATE STREAM labeled_tweets AS
SELECT RSTREAM
juba_classified_label(jubaclassify("age_model", feature_vector)) AS age,
juba_classified_label(jubaclassify("gender_model", feature_vector)) AS gender,
tag, id, screen_name, lang, text, description
FROM fv_tweets [RANGE 1 TUPLES];
The labeled_tweets
stream emits tweets with age
and gender
labels.
The jubaclassify
UDF performs classification based on the given model.
twitter> EVAL jubaclassify("gender_model", {
"text": {"i": 1, "wanna": 1, "eat":1, "sushi":1},
"description": {"i": 1, "need": 1, "sushi": 1}
});
{"male":0.021088751032948494,"female":-0.020287269726395607}
jubaclassify
returns a map of labels and their scores as shown above. The
higher the score of a label, the more likely a tweet has the label. To choose
the label having the highest score, the juba_classified_label
function is
used:
twitter> EVAL juba_classified_label({
"male":0.021088751032948494,"female":-0.020287269726395607});
"male"
jubaclassify
and juba_classified_label
functions are also defined in
the github.com/sensorbee/jubatus/classifier
package.
[Crammer09] | Koby Crammer, Alex Kulesza and Mark Dredze, Adaptive Regularization Of Weight Vectors, Advances in Neural Information Processing Systems, 2009 |
Inserting Labeled Tweets Into Elasticsearch via Fluentd¶
Finally, tweets labeled by machine learning need to be inserted into Elasticsearch for visualization. This is done via fluentd which was previously set up.
CREATE SINK fluentd TYPE fluentd;
INSERT INTO fluentd from labeled_tweets;
SensorBee provides fluentd
plugins in the github.com/sensorbee/fluentd
package. The fluentd
sink write tuples into fluentd’s forward
input
plugin running on the same host.
After creating the sink, the INSERT INTO
statement starts writing tuples
from a source or a stream into it. This statement is the last one in the
twitter.bql
file and also concludes this section. All the steps from
connecting to the Twitter API, transforming tweets and analyzing them using
Jubatus have been shown in this section. As the last part of this tutorial,
it will be shown how the training of the previously loaded model files has
been done.
Training¶
The previous section used the machine learning models that were already trained
but it was not described how to train them. This section explains how machine
learning models can be trained with BQL and the sensorbee
command.
Preparing Training Data¶
Because the machine learning algorithm used in this tutorial is supervised learning, it requires a training data set to create models. Training data is a pair of original data and its label. There is no common format of a training data set and a format can vary depending on use cases. In this tutorial, a training data set consists of multiple lines each of which has exactly one JSON object.
{"description":"I like sushi.", ...}
{"text":"I wanna eat sushi.", ...}
...
In addition, each JSON object needs to have two fields “age” and “gender”:
{"age":"10-19","gender":"male", ...other original fields...}
{"age":"20-29","gender":"female", ...other original fields...}
...
In the pre-trained model, age and gender have following labels:
age
10-19
20-29
30-39
40-49
50<
gender
male
female
Both age and gender can have additional labels if necessary. Labels can be empty
if they are not known for sure. After annotating each tweet, the training data set needs
to be saved as training_tweets.json
in the /path/to/sbml
directory.
The training data set used for the pre-trained models contains 4974 gender labels and 14747 age labels.
Training¶
Once the training data set has been prepared, the models can be trained with the following command:
/path/to/sbml$ ./sensorbee runfile -t twitter -c sensorbee.yaml -s '' train.bql
sensorbee runfile
executes BQL statements written in a given file,
e.g. train.bql
in the command above. -t twitter
means the name of the
topology is twitter
. The name is used for the filenames of saved models
later. -c sensorbee.yaml
passes the same configuration file as the one
used previously. -s ''
means sensorbee runfile
saves all UDSs after the
topology stops.
After running the command above, two models (UDSs) are saved in the uds
directory. The saved model can be loaded by the LOAD STATE
statement.
BQL Statements¶
All BQL statements for training are written in train.bql
. Most statements
in the file overlap with twitter.bql
, so only differences will be explained.
CREATE STATE age_model TYPE jubaclassifier_arow
WITH label_field = "age", regularization_weight = 0.001;
CREATE SINK age_model_trainer TYPE uds WITH name = "age_model";
CREATE STATE gender_model TYPE jubaclassifier_arow
WITH label_field = "gender", regularization_weight = 0.001;
CREATE SINK gender_model_trainer TYPE uds WITH name = "gender_model";
These statements create UDSs for machine learning models of age and gender
classifications. CREATE STATE
statements are same as ones in
twitter.bql
. The CREATE SINK
statements above create new sinks with the
type uds
. The uds
sink writes tuples into the UDS specified as name
if the UDS
supports it. jubaclassifier_arow
supports writing tuples. When a tuple is
written to it, it trains the model with the tuple having training data. It
assumes that the tuple has two fields: a feature vector field and a label field.
By default, a feature vector and a label are obtained by the feature_vector
field and the label
field in a tuple, respectively. In this tutorial, each
tuple has two labels: age
and gender
. Therefore, the field names of
those fields need to be customized. The field names can be specified by the
label_field
parameter in the WITH
clause of the CREATE STATE
statement. In the statements above, age_model
and gender_model
UDSs
obtain labels from the age
field and the gender
field, respectively.
CREATE PAUSED SOURCE training_data TYPE file WITH path = "training_tweets.json";
This statement creates a source which inputs tuples from a file.
training_tweets.json
is the file prepared previously and contains training
data. The source is created with the PAUSED
flag, so it doesn’t emit any
tuple until all other components in the topology are set up and the
RESUME SOURCE
statement is issued.
en_tweets
, preprocessed_tweets
, and fv_tweets
streams are same as
ones in twitter.bql
except that the tweets are emitted from the file
source
rather than the twitter_public_stream
source.
CREATE STREAM age_labeled_tweets AS
SELECT RSTREAM * FROM fv_tweets [RANGE 1 TUPLES] WHERE age != "";
CREATE STREAM gender_labeled_tweets AS
SELECT RSTREAM * FROM fv_tweets [RANGE 1 TUPLES] WHERE gender != "";
These statements create new sources that only emit tuples having a label for training.
INSERT INTO age_model_trainer FROM age_labeled_tweets;
INSERT INTO gender_model_trainer FROM gender_labeled_tweets;
Then, those filtered tuples are written into models (UDSs) via the uds
sinks
created earlier.
RESUME SOURCE training_data;
All streams are set up and the training_data
source is finally resumed.
With the sensorbee runfile
command, all statements run until all tuples
emitted from the training_data
source are processed.
When BQL statements are run on the server, the SAVE STATE
statement is
usually used to save UDSs. However, sensorbee runfile
optionally saves UDSs
after the topology is stopped. Therefore, train.bql
doesn’t issue
SAVE STATE
statements.
Evaluation¶
Evaluation tools are being developed.
Online Training¶
All machine learning algorithms provided by Jubatus are online algorithms, that is, models can incrementally be trained every time a new training data is given. In contrast to online algorithms, batch algorithms requires all training data for each training. Since online machine learning algorithms don’t have to store training data locally, they can train models from streaming data.
If training data can be obtained by simple rules, training and classification can be applied to streaming data concurrently in the same SensorBee server. In other words, a UDS can be used for training and classification.
The BQL Language¶
This part describes the use of the BQL Language in SensorBee. It starts with describing the general syntax of BQL, then explain how to create the structures for data in-/output and stateful operations. After that, the general processing model and the remaining BQL query types are explained. Finally, a list of operators and functions that can be used in BQL expressions is provided.
BQL Syntax¶
Lexical Structure¶
BQL has been designed to be easy to learn for people who have used SQL before. While keywords and commands differ in many cases, the basic structure, set of tokens, operators etc. is the same. For example, the following is (syntactically) valid BQL input:
SELECT RSTREAM given_name, last_name FROM persons [RANGE 1 TUPLES] WHERE age > 20;
CREATE SOURCE s TYPE fluentd WITH host="example.com", port=12345;
INSERT INTO file FROM data;
This is a sequence of three commands, one per line (although this is not required; more than one command can be on a line, and commands can span multiple lines where required). Additionally, comments can occur in BQL input. They are effectively equivalent to whitespace.
The type of commands that can be used in BQL is described in Input/Output/State Definition and Queries.
Identifiers and Keywords¶
Tokens such as SELECT
, CREATE
, or INTO
in the example above are examples of keywords, that is, words that have a fixed meaning in the BQL language.
The tokens persons
and file
are examples of identifiers.
They identify names of streams, sources, or other objects, depending on the command they are used in.
Therefore they are sometimes simply called “names”.
Keywords and identifiers have the same lexical structure, meaning that one cannot know whether a token is an identifier or a keyword without knowing the language.
BQL identifiers and keywords must begin with a letter (a-z
).
Subsequent characters can be letters, underscores, or digits (0-9
).
Keywords and unquoted identifiers are in general case insensitive.
However, there is one important difference between SQL and BQL when it comes to “column identifiers”.
In BQL, there are no “columns” with names that the user can pick herself, but “field selectors” that describe the path to a value in a JSON-like document imported from outside the system.
Therefore field selectors are case-sensitive (in order to be able to deal with input of the form {"a": 1, "A": 2}
) and also there is a form that allows to use special characters; see Field Selectors for details.
Note
There is a list of reserved words that cannot be used as identifiers to avoid confusion. This list can be found at https://github.com/sensorbee/sensorbee/blob/master/core/reservedwords.go. However, this restriction does not apply to field selectors.
Constants¶
There are multiple kinds of implicitly-typed constants in BQL: strings, decimal numbers (with and without fractional part) and booleans. Constants can also be specified with explicit types, which can enable more accurate representation and more efficient handling by the system. These alternatives are discussed in the following subsections.
String Constants¶
A string constant in BQL is an arbitrary sequence of characters bounded by double quotes ("
), for example "This is a string"
.
To include a double-quote character within a string constant, write two adjacent double quotes, e.g., "Dianne""s horse"
.
No escaping for special characters is supported at the moment, but any valid UTF-8 encoded byte sequence can be used. See the string data type reference for details.
Numeric Constants¶
There are two different numeric data types in BQL, int
and float
, representing decimal numbers without and with fractional part, respectively.
An int
constant is written as
[-]digits
A float
constant is written as
[-]digits.digits
Scientific notation (1e+10
) as well as Infinity and NaN cannot be used in BQL statements.
Some example of valid numerical constants:
42
3.5
-36
See the type references for int and float for details.
Note
For some operations/functions it makes a difference whether int
or float
is used (e.g., 2/3
is 0
, but 2.0/3
is 0.666666
).
Be aware of that when writing constants in BQL statements.
Boolean Constants¶
There are two keywords for the two possible boolean values, namely true
and false
.
See the bool data type reference for details.
Operators¶
An operator is a sequence of the items from the following list:
+
-
*
/
<
>
=
!
%
See the chapter on Operators for the complete list of operators in BQL. There are no user-defined operators at the moment.
Special Characters¶
Some characters that are not alphanumeric have a special meaning that is different from being an operator. Details on the usage can be found at the location where the respective syntax element is described. This section only exists to advise the existence and summarize the purposes of these characters.
- Parentheses (
()
) have their usual meaning to group expressions and enforce precedence. In some cases parentheses are required as part of the fixed syntax of a particular BQL command. - Brackets (
[]
) are used in Array Constructors and in Field Selectors, as well as in Stream-to-Relation Operators. - Curly brackets (
{}
) are used in Map Constructors - Commas (
,
) are used in some syntactical constructs to separate the elements of a list. - The semicolon (
;
) terminates a BQL command. It cannot appear anywhere within a command, except within a string constant or quoted identifier. - The colon (
:
) is used to separate stream names and field selectors, and within field selectors to select array slices (see Extended Descend Operators). - The asterisk (
*
) is used in some contexts to denote all the fields of a table row (see Notes on Wildcards). It also has a special meaning when used as the argument of an aggregate function, namely that the aggregate does not require any explicit parameter. - The period (
.
) is used in numeric constants and to denote descend in field selectors.
Comments¶
A comment is a sequence of characters beginning with double dashes and extending to the end of the line, e.g.:
-- This is a standard BQL comment
C-style (multi-line) comments cannot be used.
Operator Precedence¶
The following table shows the operator precedence in BQL:
Operator/Element | Description |
---|---|
:: |
typecast |
- |
unary minus |
* / % |
multiplication, division, modulo |
+ - |
addition, subtraction |
IS |
IS NULL etc. |
(any other operator) | e.g., || |
= != <> <= < >= > |
comparison operator |
NOT |
logical negation |
AND |
logical conjunction |
OR |
logical disjunction |
Value Expressions¶
Value expressions are used in a variety of contexts, such as in the target list or filter condition of the SELECT
command.
The expression syntax allows the calculation of values from primitive parts using arithmetic, logical, set, and other operations.
A value expression is one of the following:
- A constant or literal value
- A field selector
- A row metadata reference
- An operator invocation
- A function call
- An aggregate expression
- A type cast
- An array constructor
- A map constructor
- Another value expression in parentheses (used to group subexpressions and override precedence)
The first option was already discussed in Constants. The following sections discuss the remaining options.
Field Selectors¶
In SQL, each table has a well-defined schema with columns, column names and column types. Therefore, a column name is enough to check whether that column exists, what type it has and if the type that will be extracted matches the type expected by the surrounding expression.
In BQL, each row corresponds to a JSON-like object, i.e., a map with string keys and values that have one of several data types (see Data Types and Conversions). In particular, nested maps and arrays are commonplace in the data streams used with BQL. For example, a row could look like:
{"ids": [3, 17, 21, 5],
"dists": [
{"other": "foo", "value": 7},
{"other": "bar", "value": 3.5}
],
"found": true}
To deal with such nested data structures, BQL uses a subset of JSON Path to address values in a row.
Basic Descend Operators¶
In general, a JSON Path describes a path to a certain element of a JSON document. Such a document is looked at as a rooted tree and each element of the JSON Path describes how to descend from the current node to a node one level deeper, with the start node being the root. The basic rules are:
If the current node is a map, then
.child_key
or
["child_key"]
mean “descend to the child node with the key
child_key
”. The second form must be used if the key name has a non-identifier shape (e.g., contains spaces, dots, brackets or similar). It is an error if the current node is not a map. It is an error if the current node does not have such a child node.If the current node is an array, then
[k]
means “descend to the (zero-based) \(k\)-th element in the array”. Negative indices count from the end end of the array (as in Python). It is an error if the current node is not an array. It is an error if the given index is out of bounds.
The first element of a JSON Path must always be a “map access” component (since the document is always a map) and the leading dot must be omitted.
For example, ids[1]
in the document given above would return 17
, dists[-2].other
would return foo
and just dists
would return the array [{"other": "foo", "value": 7}, {"other": "bar", "value": 3.5}]
.
Extended Descend Operators¶
There is limited support for array slicing and recursive descend:
If the current node is a map or an array, then
..child_key
returns an array of all values below the current node that have the key
child_key
. However, once a node with keychild_key
has been found, it will be returned as is, even if it may possibly itself contain that key again.This selector cannot be used as the first component of a JSON Path. It is an error if the current node is not a map or an array. It is not an error if there is no child element with the given key.
If the current node is an array, then
[start:end]
returns an array of all values with the indexes in the range \([\text{start}, \text{end}-1]\). One or both of
start
andend
can be omitted, meaning “from the first element” and “until the last element”, respectively.[start:end:step]
returns an array of all elements with the indexes \([\text{start}, \text{start}+\text{step}, \text{start}+2\cdot\text{step}, \cdot\cdot\cdot, \text{end}-1]\) if
step
is positive, or \([\text{start}, \text{start}-\text{step}, \text{start}-2\cdot\text{step}, \cdot\cdot\cdot, \text{end}+1]\) if it is negative. (This description is only true for positive indices, but in fact also negative indices can be used, again counting from the end of the array.) In general, the behavior has been implemented to be very close to Python’s list slicing.These selectors cannot be used as the first component of a JSON Path. It is an error if it can be decided independent of the input data that the specified values do not make sense (e.g.,
step
is 0, orend
is larger thanstart
butstep
is negative), but slices that will always be empty (e.g.,[2:2]
) are valid. Also, if it depends on the input data whether a slice specification is valid or not (e.g.,[4:-4]
) it is not an error, but an empty array is returned.If the slicing or recursive descend operators are followed by ordinary JSON Path operators as described before, their meaning changes to ”... for every element in the array”. For example,
list[1:3].foo
has the same result as[list[1].foo, list[2].foo, list[3].foo]
(except that the latter would fail iflist
is not long enough) or a Python list comprehension such as[x.foo for x in list[1:3]]
. However, it is not possible to chain multiple list-returning operators:list[1:3]..foo
orfoo..bar..hoge
are invalid.
Examples¶
Given the input data
{
"foo": [
{"hoge": [
{"a": 1, "b": 2},
{"a": 3, "b": 4} ],
"bar": 5},
{"hoge": [
{"a": 5, "b": 6},
{"a": 7, "b": 8} ],
"bar": 2},
{"hoge": [
{"a": 9, "b": 10} ],
"bar": 8}
],
"nantoka": {"x": "y"}
}
the following table is supposed to illustrate the effect of various JSON Path expressions.
Path | Result |
---|---|
nantoka |
{"x": "y"} |
nantoka.x |
"y" |
nantoka["x"] |
"y" |
foo[0].bar |
5 |
foo[0].hoge[-1].a |
3 |
["foo"][0]["hoge"][-1]["a"] |
3 |
foo[1:2].bar |
[2, 8] |
foo..bar |
[5, 2, 8] |
foo..hoge[0].b |
[2, 6, 10] |
Row Metadata References¶
Metadata is the data that is attached to a tuple but which cannot be accessed as part of the normal row data.
Tuple Timestamp¶
At the moment, the only metadata that can be accessed from within BQL is a tuple’s system timestamp (the time that was set by the source that created it).
This timestamp can be accessed using the ts()
function.
If multiple streams are joined, a stream prefix is required to identify the input tuple that is referred to, i.e.,
stream_name:ts()
Operator Invocations¶
There are three possible syntaxes for an operator invocation:
expression operator expression
operator expression
expression operator
See the section Operators for details.
Function Calls¶
The syntax for a function call is the name of a function, followed by its argument list enclosed in parentheses:
function_name([expression [, expression ... ]])
For example, the following computes the square root of 2:
sqrt(2);
The list of built-in functions is described in section Functions.
Aggregate Expressions¶
An aggregate expression represents the application of an aggregate function across the rows selected by a query. An aggregate function reduces multiple inputs to a single output value, such as the sum or average of the inputs. The syntax of an aggregate expression is the following:
function_name(expression [, ... ] [ order_by_clause ])
where function_name
is a previously defined aggregate and expression is any value expression that does not itself contain an aggregate expression.
The optional order_by_clause
is described below.
In BQL, aggregate functions can take aggregate and non-aggregate parameters.
For example, the string_agg
function can be called like
string_agg(name, ", ")
to return a comma-separated list of all names in the respective group. However, the second parameter is not an aggregation parameter, so for a statement like
SELECT RSTREAM string_agg(name, sep) FROM ...
sep
must be mentioned in the GROUP BY
clause.
For many aggregate functions (e.g., sum
or avg
), the order of items in the group does not matter.
However, for other functions (e.g., string_agg
) the user has certain expectations with respect to the order that items should be fed into the aggregate function.
In this case, the order_by_clause
with the syntax
ORDER BY expression [ASC | DESC] [ , expression [ASC | DESC] ... ]
can be used. The rows that are fed into the aggregate function are sorted by the values of the given expression in ascending (default) or descending mode. For example,
string_agg(first_name || " " || last_name, "," ORDER BY last_name)
will create a comma-separated list of names, ordered ascending by the last name.
See Aggregate Functions for a list of built-in aggregate functions.
Type Casts¶
A type cast specifies a conversion from one data type to another. BQL accepts two equivalent syntaxes for type casts:
CAST(expression AS type)
expression::type
When a cast is applied to a value expression, it represents a run-time type conversion. The cast will succeed only if a suitable type conversion operation has been defined, see Conversions.
Array Constructors¶
An array constructor is an expression that builds an array value using values for its member elements.
A simple array constructor consists of a left square bracket [
, a list of expressions (separated by commas) for the array element values, and finally a right square bracket ]
.
For example:
SELECT RSTREAM [7, 2 * stream:a, true, "blue"] FROM ...
Each element of the array can have a different type. In particular, the wildcard is also allowed as an expression and will include the whole current row (i.e., a map) as an array element.
Note
Single-element arrays of strings could also be interpreted as JSON Paths and are therefore required to have a trailing comma after their only element: ["foo",]
Map Constructors¶
A map constructor is an expression that builds a map value using string keys and arbitrary values for its member elements.
A simple map constructor consists of a left curly bracket {
, a list of "key": value
pairs (separated by commas) for the map elements, and finally a right curly bracket }
.
For example:
SELECT RSTREAM {"a_const": 7, "prod": 2 * stream:a} FROM ...
The keys must be string literals (i.e., they cannot be computed expressions); in particular they must be written using double quotes. The values can be arbitrary expressions, including a wildcard.
Expression Evaluation Rules¶
The order of evaluation of subexpressions is not defined. In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order.
Furthermore, if the result of an expression can be determined by evaluating only some parts of it, then other subexpressions might not be evaluated at all. For instance, if one wrote:
true OR somefunc()
then somefunc()
would (probably) not be called at all.
The same would be the case if one wrote:
somefunc() OR true
Note that this is not the same as the left-to-right “short-circuiting” of Boolean operators that is found in some programming languages.
Calling Functions¶
BQL allows functions to be called using only the positional notation. In positional notation, a function call is written with its argument values in the same order as they are defined in the function declaration. Therefore, while some parameters of a function can be optional, these parameters can only be omitted at the end of the parameter list.
For example,
log(100)
log(100, 2)
are both valid function calls computing the logarithm of a function. The first one uses the default value 10 for the logarithm base, the second one uses the given value 2.
Input/Output/State Definition¶
To process streams of data, that data needs to be imported into SensorBee and the processing results have to be exported from it. This chapter introduces input and output components in BQL. It also describes how BQL supports stateful data processing using user-defined states (UDSs).
Data Input¶
BQL inputs a stream of data using a source. A source receives data defined and generated outside SensorBee, converts that data into tuples, and finally emits tuples for further processing. This section describes how a source can be created, operated, and dropped.
Creating a Source¶
A source can be created using the CREATE SOURCE statement.
CREATE SOURCE logs TYPE file WITH path = "access_log.jsonl";
In this example, a source named logs
is created and it has the type
file
. The file
source type has a required parameter called path
. The
parameter is specified in the WITH
clause. Once a source is created, other
components described later can read tuples from the source and compute results
based on them.
When multiple parameters are required, they should be separated by commas:
CREATE SOURCE src TYPE some_type
WITH param1 = val1, param2 = val2, param3 = val3;
Each source type has its own parameters and there is no parameter that is common to all source types.
Source types can be registered to the SensorBee server as plugins. To learn how to develop and register a source plugin, see Source Plugins.
Built-in Sources¶
BQL has a number of built-in source types.
file
Thefile
type provides a source that inputs tuples from an existing file.
node_statuses
Thenode_statuses
source periodically emits tuples with information about nodes in a topology. The status includes connected nodes, number of tuples emitted from or written to the node, and so on.
edge_statuses
Theedge_statuses
source periodically emits tuples with information about each edge (a.k.a. pipe) that connects a pair of nodes. Although this information is contained in tuples emitted fromnode_statuses
source, theedge_statuses
source provides more edge-centric view of IO statuses.
dropped_tuples
Thedropped_tuples
emits tuples dropped from a topology. It only reports once per tuple. Tuples are often dropped from a topology because a source or a stream is not connected to any other node or aSELECT
statement tries to look up a nonexistent field of a tuple.
Pausing and Resuming a Source¶
By default, a source starts emitting tuples as soon as it is created. By adding
the PAUSED
keyword to the CREATE SOURCE
statement, it creates a source
that is paused on startup:
CREATE PAUSED SOURCE logs TYPE file WITH path = "access_log.jsonl";
The RESUME SOURCE statement makes a paused source emit tuples again:
RESUME SOURCE logs;
The statement takes the name of the source to be resumed.
A source can be paused after it is created by the PAUSE SOURCE statement:
PAUSE SOURCE logs;
The statement also takes the name of the source to be paused.
Not all sources support PAUSE SOURCE
and RESUME SOURCE
statements.
Issuing statements to those sources results in an error.
Rewinding a Source¶
Some sources can be rewound, that is, they emit tuples again starting from the beginning. The REWIND SOURCE statement rewinds a source if the source supports the statement:
REWIND SOURCE logs;
The statement takes the name of the source to be rewound. Issuing the statement to sources that don’t support rewinding results in an error.
Dropping a Source¶
The DROP SOURCE statement drops (i.e. removes) a source from a topology:
DROP SOURCE logs;
The statement takes the name of the source to be dropped. Other nodes in a topology cannot refer to the source once it’s dropped. Also, nodes connected to a source may be stopped cascadingly when the source gets dropped.
Data Output¶
Results of tuple processing need to be emitted to systems or services running outside the SensorBee server in order to work with them as part of a larger system. A sink receives the results of computations performed within the SensorBee server and sends them to the outside world. This section explains how sinks are operated in BQL.
Creating a Sink¶
A sink can be created by the CREATE SINK statement:
CREATE SINK filtered_logs TYPE file WITH path = "filtered_access_log.jsonl";
The statement is very similar to the CREATE SOURCE
statement. It takes the
name of the new sink, its type, and parameters. Multiple parameters can also be
provided as a list separated by commas. Each sink type has its own parameters
and there is no parameter that is common to all sink types.
Sink types can also be registered to the SensorBee server as plugins. To learn how to develop and register a sink plugin, see Sink Plugins.
Built-in Sinks¶
BQL has a number of built-in sink types.
file
Thefile
type provides a sink that writes tuples to a file.
stdout
Astdout
sink writes output tuples to stdout.
uds
Auds
sink passes tuples to user-defined states, which is described later.
Writing Data to a Sink¶
The INSERT INTO statement writes data to a sink:
INSERT INTO filtered_logs FROM filtering_stream;
The statement takes the name of sink to be written and the name of a source or a stream, which will be described in following chapters.
Stateful Data Processing¶
SensorBee supports user-defined states (UDSs) to perform stateful streaming data processing. Such processing includes not only aggregates such as counting but also machine learning, adaptive sampling, and so on. In natural language processing, dictionaries or configurations for tokenizers can also be considered as states.
This section describes operations involving UDSs. Use cases of UDSs are described in the tutorials and how to develop a custom UDS is explained in the server programming part.
Creating a UDS¶
A UDS can be created using the CREATE STATE statement:
CREATE STATE age_classifier TYPE jubaclassifier_arow
WITH label_field = "age", regularization_weight = 0.001;
This statement creates a UDS named age_classifier
with the type
jubaclassifier_arow
. It has two parameters: label_field
and
regularization_weight
. Each UDS type has its own parameters and there is no
parameter that is common to all UDS types.
A UDS is usually used via user-defined functions (UDFs) that know about the internals of a specific UDS type. See server programming part for details.
Saving a State¶
The SAVE STATE statement persists a UDS:
SAVE STATE age_classifier;
The statement takes the name of the UDS to be saved. After the statement is issued, SensorBee saves the state based on the given configuration. The location and the format of saved data depend on the run-time configuration and are unknown to users.
The SAVE STATE
statement may take a TAG
to support versioning of the
saved data:
SAVE STATE age_classifier TAG initial;
-- or
SAVE STATE age_classifier TAG trained;
When the TAG
clause is omitted, default
will be the default tag name.
Loading a State¶
The LOAD STATE loads a UDS that was previously saved with the
SAVE STATE
statement:
LOAD STATE age_classifier TYPE jubaclassifier_arow;
The statement takes the name of the UDS to be loaded and its type name.
The LOAD STATE
statements may also take a TAG
:
LOAD STATE age_classifier TYPE jubaclassifier_arow TAG initial;
-- or
LOAD STATE age_classifier TYPE jubaclassifier_arow TAG trained;
The UDS needs to have been saved with the specified tag before. When the TAG
clause is omitted, it’s same as:
LOAD STATE age_classifier TYPE jubaclassifier_arow TAG default;
The LOAD STATE
statement fails if no saved state with the given name
and type exists. In that case, to avoid failure and instead create a new
“empty” instance, the OR CREATE IF NOT SAVED
clause can be added:
LOAD STATE age_classifier TYPE jubaclassifier_arow
OR CREATE IF NOT SAVED
WITH label_field = "age", regularization_weight = 0.001;
If there is a saved state, this statement will load it, otherwise create a
new state with the given parameters. This variant, too, can be used with
the TAG
clause:
LOAD STATE age_classifier TYPE jubaclassifier_arow TAG trained
OR CREATE IF NOT SAVED
WITH label_field = "age", regularization_weight = 0.001;
Dropping a State¶
The DROP STATE statement drops a UDS from a topology:
DROP STATE age_classifier;
The statement takes the name of the UDS to be dropped. Once a UDS is dropped, it can no longer be referred to by any statement unless it is cached somewhere.
Queries¶
The previous chapters described how to define data sources and sinks to communicate with the outside world. Now it is discussed how to transform the data stream from those sources and write it to the defined sinks - that is, how to actually process data.
Processing Model¶
Overview¶
The processing model in BQL is similar to what is explained in [cql]. In this model, each tuple in a stream has the shape \((t, d)\), where \(t\) is the original timestamp and \(d\) the data contained.
In order to execute SQL-like queries, a finite set of tuples from the possibly unbounded stream, a relation, is required.
In the processing step at time \(t^*\), a stream-to-relation operator \(R\) that converts a certain set of tuples in the stream to a relation \(R(t^*)\) is used.
This relation is then processed with a relation-to-relation operator \(O\) that is expressed in a form very closely related to an SQL SELECT
statement.
Finally, a relation-to-stream operator \(S\) will emit certain rows from the output relation \(O(R(t^*))\) into the output stream, possibly taking into account the results of the previous execution step \(O(R(t^*_{\text{prev}}))\).
This process is illustrated in the following figure:

This three-step pipeline is executed for each tuple, but only for one tuple at a time. Therefore, during execution there is a well-defined “current tuple”. This also means that if there is no tuple in the input stream for a long time, transformation functions will not be called.
Now the kind of stream-to-relation and relation-to-stream operators that can be used in BQL is explained.
Stream-to-Relation Operators¶
In BQL, there are two different stream-to-relation operators, a time-based one and a tuple-based one.
They are also called “window operators”, since they define a sliding window on the input stream.
In terms of BQL syntax, the window operator is given after a stream name in the FROM
clause within brackets and using the RANGE
keyword, for example:
... FROM events [RANGE 5 SECONDS] ...
... FROM data [RANGE 10 TUPLES] ...
... FROM left [RANGE 2 SECONDS], right [RANGE 5 TUPLES] ...
From an SQL point of view, it makes sense to think of stream [RANGE window-spec]
as the table to operate on.
The time-based operator is used with a certain time span \(I\) (such as 60 seconds) and at point in time \(t^*\) uses all tuples in the range \([t^*-I, t^*]\) to create the relation \(R(t^*)\).
Valid time spans are positive integer or float values, followed by the SECONDS
or MILLISECONDS
keyword, for example [RANGE 3.5 SECONDS]
or [RANGE 200 MILLISECONDS]
are valid specifications.
The maximal allowed values are 86,400 for SECONDS
and 86,400,000 for MILLISECONDS
, i.e., the maximal window size is one day.
Note
- The point in time \(t^*\) is not the “current time” (however that would be defined), but it is equal to the timestamp of the current tuple. This approach means that a stream can be reprocessed with identical results independent of the system clock of some server. Also it is not necessary to worry about a delay until a tuple arrives in the system and is processed there.
- It is assumed that the tuples in the input stream arrive in the order of their timestamps. If timestamps are out of order, the window contents are not well-defined.
- The sizes of relations \(R(t^*_1)\) and \(R(t^*_2)\) can be different, since there may be more or less tuples in the given time span. However, there is always at least one tuple in the relation (the current one).
The tuple-based operator is used with a number \(k\) and uses the last \(k\) tuples that have arrived (or all tuples that have arrived when this number is less than \(k\)) to create the relation \(R(t^*)\). The example figure above shows a tuple-based window with \(k=3\).
Valid ranges are positive integral values, followed by the TUPLES
keyword, for example [RANGE 10 TUPLES]
is a valid specification.
The maximal allowed value is 1,048,575.
Note
- The timestamps of tuples do not have any effect with this operator, they can also be out of order. Only the order in which the tuples arrived is important. (Note that for highly concurrent systems, “order” is not always a well-defined term.)
- At the beginning of stream processing, when less than \(k\) tuples have arrived, the size of the relation will be less than \(k\). [1] As soon as \(k\) tuples have arrived, the relation size will be constant.
[1] | Sometimes this leads to unexpected effects or complicated workarounds, while the cases where this is a useful behavior may be few. Therefore this behavior may change in future version. |
Relation-to-Stream Operators¶
Once a resulting relation \(O(R(t^*))\) is computed, tuples in the relation need to be output as a stream again.
In BQL, there are three relation-to-stream operators, RSTREAM
, ISTREAM
and DSTREAM
.
They are also called “emit operators”, since they control how tuples are emitted into the output stream.
In terms of BQL syntax, the emit operator keyword is given after the SELECT
keyword, for example:
SELECT ISTREAM uid, msg FROM ...
The following subsections describe how each operator works. To illustrate the effects of each operator, a visual example is provided afterwards.
RSTREAM
Operator¶
When RSTREAM
is specified, all tuples in the relation are emitted.
In particular, a combination of RSTREAM
with a RANGE 1 TUPLES
window operator leads to 1:1 input/output behavior and can be processed by a faster execution plan than general statements.
In contrast,
SELECT RSTREAM * FROM src [RANGE 100 TUPLES];
emits (at most) 100 tuples for every tuple in src
.
ISTREAM
Operator¶
When ISTREAM
is specified, all tuples in the relation that have not been in the previous relation are emitted.
(The “I” in ISTREAM
stands for “insert”.)
Here, “previous” refers to the relation that was computed for the tuple just before the current tuple.
Therefore the current relation can contain at most one row that was not in the previous relation and thus ISTREAM
can emit at most one row in each run.
In section 4.3.2 of [streamsql], it is highlighted that for the “is contained in previous relation” check, a notion of equality is required; in particular there are various possibilities how to deal with multiple tuples that have the same value. In BQL tuples with the same value are considered equal, so that if the previous relation contains the values \(\{a, b\}\) and the current relation contains the values \(\{b, a\}\), then nothing is emitted. However, multiplicities are respected, so that if the previous relation contains the values \(\{b, a, b, a\}\) and the current relation contains \(\{a, b, a, a\}\), then one \(a\) is emitted.
As an example for a typical use case,
SELECT ISTREAM * FROM src [RANGE 1 TUPLES];
will drop subsequent duplicates, i.e., emit only the first occurrence of a series of tuples with identical values.
To illustrate the multiplicity counting,
SELECT ISTREAM 1 FROM src [RANGE 3 TUPLES];
will emit three times \(1\) and then nothing (because after the first three tuples processed, both the previous and the current relation always look like \(\{1, 1, 1\}\).)
DSTREAM
Operator¶
The DSTREAM
operator is very similar to ISTREAM
, except that it emits all tuples in the previous relation that are not also contained in the current relation.
(The “D” in DSTREAM
stands for “delete”.)
Just as ISTREAM
, equality is computed using value comparison and multiplicity counting is used:
If the previous relation contains the values \(\{a, a, b, a\}\) and the current relation contains \(\{b, b, a, a\}\), then one \(a\) is emitted.
As an example for a typical use case,
SELECT DSTREAM * FROM src [RANGE 1 TUPLES];
will emit only the last occurrence of a series of tuples with identical values.
To illustrate the multiplicity counting,
SELECT DSTREAM 1 FROM src [RANGE 3 TUPLES];
will never emit anything.
Examples¶
To illustrate the difference between the three emit operators, a concrete example shall be presented.
Consider the following statement (where *STREAM
is a placeholder for one of the emit operators):
SELECT *STREAM id, price FROM stream [RANGE 3 TUPLES] WHERE price < 8;
This statement just takes the id
and price
key-value pairs of every tuple and outputs them untransformed.
In the following table, the leftmost column shows the data of the tuple in the stream, next to that is the contents of the current window \(R(t^*)\), then the results of the relation-to-relation operator \(O(R(t^*))\). In the table below, there is the list of items that would be output by the respective emit operator.
Current Tuple’s Data | Current Window \(R(t^*)\) | Output Relation \(O(R(t^*))\) |
---|---|---|
(last three tuples) | ||
{"id": 1, "price": 3.5} |
{"id": 1, "price": 3.5} |
{"id": 1, "price": 3.5} |
{"id": 2, "price": 4.5} |
{"id": 1, "price": 3.5}
{"id": 2, "price": 4.5} |
{"id": 1, "price": 3.5}
{"id": 2, "price": 4.5} |
{"id": 3, "price": 10.5} |
{"id": 1, "price": 3.5}
{"id": 2, "price": 4.5}
{"id": 3, "price": 10.5} |
{"id": 1, "price": 3.5}
{"id": 2, "price": 4.5} |
{"id": 4, "price": 8.5} |
{"id": 2, "price": 4.5}
{"id": 3, "price": 10.5}
{"id": 4, "price": 8.5} |
{"id": 2, "price": 4.5} |
{"id": 5, "price": 6.5} |
{"id": 3, "price": 10.5}
{"id": 4, "price": 8.5}
{"id": 5, "price": 6.5} |
{"id": 5, "price": 6.5} |
RSTREAM | ISTREAM | DSTREAM |
---|---|---|
{"id": 1, "price": 3.5} |
{"id": 1, "price": 3.5} |
|
{"id": 1, "price": 3.5}
{"id": 2, "price": 4.5} |
{"id": 2, "price": 4.5} |
|
{"id": 1, "price": 3.5}
{"id": 2, "price": 4.5} |
||
{"id": 2, "price": 4.5} |
{"id": 1, "price": 3.5} |
|
|
{"id": 5, "price": 6.5} |
{"id": 2, "price": 4.5} |
[cql] | Arasu et al., “The CQL Continuous Query Language: Semantic Foundations and Query Execution”, http://ilpubs.stanford.edu:8090/758/1/2003-67.pdf |
[streamsql] | Jain et al., “Towards a Streaming SQL Standard”, http://cs.brown.edu/~ugur/streamsql.pdf |
Selecting and Transforming Data¶
In the previous section, it was explained how BQL converts stream data into relations and back.
This section is about how this relational data can be selected and transformed.
This functionality is exactly what SQL’s SELECT
statement was designed to do, and so in BQL the SELECT
syntax is mimicked as much as possible.
(Some basic knowledge of what the SQL SELECT
statement does is assumed.)
However, as opposed to the SQL data model, BQL’s input data is assumed to be JSON-like, i.e., with varying shapes, nesting levels, and data types;
therefore the BQL SELECT
statement has a number of small differences to SQL’s SELECT
.
Overview¶
The general syntax of the SELECT
command is
SELECT emit_operator select_list FROM table_expression;
The emit_operator
is one of the operators described in Relation-to-Stream Operators.
The following subsections describe the details of select_list
and table_expression
.
Table Expressions¶
A table expression computes a table.
The table expression contains a FROM
clause that is optionally followed by WHERE
, GROUP BY
, and HAVING
clauses:
... FROM table_list [WHERE filter_expression]
[GROUP BY group_list] [HAVING having_expression]
The FROM
Clause¶
The FROM
clause derives a table from one or more other tables given in a comma-separated table reference list.
FROM table_reference [, table_reference [, ...]]
In SQL, each table_reference
is (in the simplest possible case) an identifier that refers to a pre-defined table, e.g., FROM users
or FROM names, addresses, cities
are valid SQL FROM
clauses.
In BQL, only streams have identifiers, so in order to get a well-defined relation, a window specifier as explained in Stream-to-Relation Operators must be added.
In particular, the examples just given for SQL FROM
clauses are all not valid in BQL, but the following are:
FROM users [RANGE 10 TUPLES]
FROM names [RANGE 2 TUPLES], addresses [RANGE 1.5 SECONDS], cities [RANGE 200 MILLISECONDS]
BQL also knows “user-defined stream-generating functions” (UDSFs) that transform a stream into another stream and can be used, for example, to output multiple output rows per input row; something that is not possible with standard SELECT
features.
(These are similar to “Table Functions” in PostgreSQL.)
Such UDSFs can also be used in the FROM
clause:
Instead of using a stream’s identifier, use the function call syntax function(param, param, ...)
with the UDSF name as the function name and the base stream’s identifiers as parameters (as a string, i.e., in double quotes), possibly with other parameters.
For example, if there is a UDSF called duplicate
that takes the input stream’s name as the first parameter and the number of copies of each input tuple as the second, this would look as follows:
FROM duplicate("products", 3) [RANGE 10 SECONDS]
If more than one table reference is listed in the FROM
clause, the tables are cross-joined (that is, the Cartesian product of their rows is formed).
The syntax table1 JOIN table2 ON (...)
is not supported in BQL.
The result of the FROM
list is an intermediate virtual table that can then be subject to transformations by the WHERE
, GROUP BY
, and HAVING
clauses and is finally the result of the overall table expression.
A temporary name can be given to tables and complex table references to be used for references to the derived table in the rest of the query. This is called a “table alias”. To create a table alias, write
FROM table_reference AS alias
The use of table aliases is optional, but helps to shorten statements.
By default, each table can be addressed using the stream name or the UDSF name, respectively.
Therefore, table aliases are only mandatory if the same stream/UDSF is used multiple times in a join.
Taking aliases into account, each name must uniquely refer to one table. FROM stream [RANGE 1 TUPLES], stream [RANGE 2 TUPLES]
or FROM streamA [RANGE 1 TUPLES], streamB [RANGE 2 TUPLES] AS streamA
are not valid, but FROM stream [RANGE 1 TUPLES] AS streamA, stream [RANGE 2 TUPLES] AS streamB
and also FROM stream [RANGE 1 TUPLES], stream [RANGE 2 TUPLES] AS other
are.
The WHERE
Clause¶
The syntax of the WHERE
clause is
WHERE filter_expression
where filter_expression
is any expression with a boolean value.
(That is, WHERE 6
is not a valid filter, but WHERE 6::bool
is.)
After the processing of the FROM
clause is done, each row of the derived virtual table is checked against the search condition.
If the result of the condition is true, the row is kept in the output table, otherwise (i.e., if the result is false or null) it is discarded.
The search condition typically references at least one column of the table generated in the FROM
clause; this is not required, but otherwise the WHERE
clause will be fairly useless.
As BQL does not support the table1 JOIN table2 ON (condition)
syntax, any join condition must always be given in the WHERE
clause.
The GROUP BY
and HAVING
Clauses¶
After passing the WHERE
filter, the derived input table might be subject to grouping, using the GROUP BY
clause, and elimination of group rows using the HAVING
clause.
They basically have the same semantics as explained in the PostgreSQL Documentation, section 7.2.3
One current limitation of BQL row grouping is that only simple columns can be used in the GROUP BY
list, no complex expressions are allowed.
For example, GROUP BY round(age/10)
cannot be used in BQL at the moment.
Select Lists¶
As shown in the previous section, the table expression in the SELECT command constructs an intermediate virtual table by possibly combining tables, views, eliminating rows, grouping, etc. This table is finally passed on to processing by the “select list”. The select list determines which elements of the intermediate table are actually output.
Select-List Items¶
As in SQL, the select list contains a number of comma-separated expressions:
SELECT emit_operator expression [, expression] [...] FROM ...
In general, items of a select list can be arbitrary Value Expressions. In SQL, tables are strictly organized in “rows” and “columns” and the most important elements in such expressions are therefore column references.
In BQL, each input tuple can be considered a “row”, but the data can also be unstructured and the notion of a “column” is not sufficient.
(In fact, each row corresponds to a map object.)
Therefore, BQL uses JSON Path to address data in each row.
If only one table is used in the FROM
clause and only top-level keys of each JSON-like row are referenced, the BQL select list looks the same as in SQL:
SELECT RSTREAM a, b, c FROM input [RANGE 1 TUPLES];
If the input data has the form {"a": 7, "b": "hello", "c": false}
, then the output will look exactly the same.
However, JSON Path allows to access nested elements as well:
SELECT RSTREAM a.foo.bar FROM input [RANGE 1 TUPLES];
If the input data has the form {"a": {"foo": {"bar": 7}}}
, then the output will be {"col_0": 7}
.
(See paragraph Column Labels below for details on output key naming, and the section Field Selectors for details about the available syntax for JSON Path expressions.)
Where SQL uses the dot in SELECT left.a, right.b
to specify the table from which to use a column, JSON Path uses the dot to describe a child relation in a single JSON element as shown above.
Therefore to avoid ambiguity, BQL uses the colon (:
) character to separate table and JSON Path:
SELECT RSTREAM left:foo.bar, right:hoge FROM ...
If there is just one table to select from, the table prefix can be omitted, but then it must be omitted in all expressions of the statement.
If there are multiple tables in the FROM
clause, then table prefixes must be used.
Column Labels¶
The result value of every expression in the select list will be assigned to a key in the output row.
If not explicitly specified, these output keys will be "col_0"
, "col_1"
, etc. in the order the expressions were specified in the select list.
However, in some cases a more meaningful output key is chosen by default, as already shown above:
- If the expression is a single top-level key (like
a
), then the output key will be the same. - If the expression is a simple function call (like
f(a)
), then the output key will be the function name. - If the expression refers the timestamp of a tuple in a stream (using the
stream:ts()
syntax), then the output key will bets
. - If the expression is the wildcard (
*
), then the input will be copied, i.e., all keys from the input document will be present in the output document.
The output key can be overridden by specifying an ... AS output_key
clause after an expression.
For the example above,
SELECT RSTREAM a.foo.bar AS x FROM input [RANGE 1 TUPLES];
will result in an output row that has the shape {"x": 7}
instead of {"col_0": 7}
.
Note that it is possible to use the same column label multiple times, but in this case it is undefined which of the values with the same alias will end up in that output key.
To place values at other places than the top level of an output row map, a subset of the JSON Path syntax described in Field Selectors can be used for column labels as well. Where such a selector describes the position in a map uniquely, the value will be placed at that location. For the input data example above,
SELECT RSTREAM a.foo.bar AS x.y[3].z FROM input [RANGE 1 TUPLES];
will result in an output document with the following shape:
{"x": {"y": [null, null, null, {"z": 7}]}}
That is, a string child_key
in the column label hierarchy will assume a map at the corresponding position and put the value in that map using child_key
as a key; a numeric index [n]
will assume an array and put the value in the n
-th position, padded with NULL
items before if required. Negative list indices cannot be used. Also, Extended Descend Operators cannot be used.
It is safe to assign multiple values to non-overlapping locations of an output row created this way, as shown below:
SELECT RSTREAM 7 AS x.y[3].z, "bar" AS x.foo, 17 AS x.y[0]
FROM input [RANGE 1 TUPLES];
This will create the following output row:
{"x": {"y": [17, null, null, {"z": 7}], "foo": "bar"}}
However, as the order in which the items of the select list are processed is not defined, it is not safe to override values placed by one select list item from another select list item. For example,
SELECT RSTREAM [1, 2, 3] AS x, 17 AS x[1] ...
does not guarantee a particular output. Also, statements such as
SELECT RSTREAM 1 AS x.y, 2 AS x[1] ...
will lead to errors because x cannot be a map and an array at the same time.
Notes on Wildcards¶
In SQL, the wildcard (*
) can be used as a shorthand expression for all columns of an input table.
However, due to the strong typing in SQL’s data model, name and type conflicts can still be checked at the time the statement is analyzed.
In BQL’s data model, there is no strong typing, therefore the wildcard operator must be used with a bit of caution.
For example, in
SELECT RSTREAM * FROM left [RANGE 1 TUPLES], right [RANGE 1 TUPLES];
if the data in the left
stream looks like {"a": 1, "b": 2}
and the data in the right
stream looks like {"b": 3, "c": 4}
, then the output document will have the keys a
, b
, and c
, but the value of the b
key is undefined.
To select all keys from only one stream, the colon notation (stream:*
) as introduced above can be used.
The wildcard can be used with a column alias as well.
The expression * AS foo
will nest the input document under the given key foo
, i.e., input {"a": 1, "b": 2}
is transformed to {"foo": {"a": 1, "b": 2}}
.
On the other hand, it is also possible to use the wildcard as an alias, as in foo AS *
.
This will have the opposite effect, i.e., it takes the contents of the foo
key (which must be a map itself) and pulls them up to top level, i.e., {"foo": {"a": 1, "b": 2}}
is transformed to {"a": 1, "b": 2}
.
Note that any name conflicts that arise due to the use of the wildcard operator (e.g., in *
, a:*, b:*
, foo AS *, bar AS *
) lead to undefined values in the column with the conflicting name.
However, if there is an explicitly specified output key, this will always be prioritized over a key originating from a wildcard expression.
Examples¶
Select List | Input Row | Output Row |
---|---|---|
a |
{"a": 1, "b": 2} |
{"a": 1} |
a, b |
{"a": 1, "b": 2} |
{"a": 1, "b": 2} |
a + b |
{"a": 1, "b": 2} |
{"col_0": 3} |
a, a + b |
{"a": 1, "b": 2} |
{"a": 1, "col_1": 3} |
* |
{"a": 1, "b": 2} |
{"a": 1, "b": 2} |
l
and r
¶Select List | Input Row (l ) |
Input Row (r ) |
Output Row |
---|---|---|---|
l:a |
{"a": 1, "b": 2} |
{"c": 3, "d": 4} |
{"a": 1} |
l:a, r:c |
{"a": 1, "b": 2} |
{"c": 3, "d": 4} |
{"a": 1, "c": 3} |
l:a + r:c |
{"a": 1, "b": 2} |
{"c": 3, "d": 4} |
{"col_0": 4} |
l:* |
{"a": 1, "b": 2} |
{"c": 3, "d": 4} |
{"a": 1, "b": 2} |
l:*, r:c AS b |
{"a": 1, "b": 2} |
{"c": 3, "d": 4} |
{"a": 1, "b": 3} |
l:*, r:* |
{"a": 1, "b": 2} |
{"c": 3, "d": 4} |
{"a": 1, "b": 2, "c": 3, "d": 4} |
* |
{"a": 1, "b": 2} |
{"c": 3, "d": 4} |
{"a": 1, "b": 2, "c": 3, "d": 4} |
* |
{"a": 1, "b": 2} |
{"b": 3, "d": 4} |
{"a": 1, "b": (undef.), "d": 4} |
Building Processing Pipelines¶
The SELECT
statement as described above returns a data stream (where the transport mechanism depends on the client in use), but often an unattended processing pipeline (i.e., running on the server without client interaction) needs to set up.
In order to do so, a stream can be created from the results of a SELECT
query and then used afterwards like an input stream.
(The concept is equivalent to that of an SQL VIEW
.)
The statement used to create a stream from an SELECT statement is:
CREATE STREAM stream_name AS select_statement;
For example:
CREATE STREAM odds AS SELECT RSTREAM * FROM numbers [RANGE 1 TUPLES] WHERE id % 2 = 1;
If that statement is issued correctly, subsequent statements can refer to stream_name
in their FROM
clauses.
If a stream thus created is no longer needed, it can be dropped using the DROP STREAM
command:
DROP STREAM stream_name;
Expression Evaluation¶
To evaluate expressions outside the context of a stream, the EVAL
command can be used.
The general syntax is
EVAL expression;
and expression
can generally be any expression, but it cannot contain references to any columns, aggregate functions or anything that only makes sense in a stream processing context.
For example, in the SensorBee Shell, the following can be done:
> EVAL "foo" || "bar";
foobar
Data Types and Conversions¶
This chapter describes data types defined in BQL and how their type conversion works.
Overview¶
BQL has following data types:
Type name | Description | Example |
---|---|---|
null |
Null type | NULL |
bool |
Boolean | true |
int |
64-bit integer | 12 |
float |
64-bit floating point number | 3.14 |
string |
String | "sensorbee" |
blob |
Binary large object | A blob value cannot directly be written in BQL. |
timestamp |
Datetime information in UTC | A timestamp value cannot directly be written in BQL. |
array |
Array | [1, "2", 3.4] |
map |
Map with string keys | {"a": 1, "b": "2", "c": 3.4} |
These types are designed to work well with JSON. They can be converted to or from JSON with some restrictions.
Note
User defined types are not available at the moment.
Types¶
This section describes the detailed specification of each type.
null
¶
The type null
only has one value: NULL
, which represents an empty or
undefined value.
array
can contain NULL
as follows:
[1, NULL, 3.4]
map
can also contain NULL
as its value:
{
"some_key": NULL
}
This map is different from an empty map {}
because the key "some_key"
actually exists in the map but the empty map doesn’t even have a key.
NULL
is converted to null
in JSON.
bool
¶
The type bool
has two values: true
and false
. In terms of a
three-valued logic, NULL
represents the third state, “unknown”.
true
and false
are converted to true
and false
in JSON,
respectively.
int
¶
The type int
is a 64-bit integer type. Its minimum value is
-9223372036854775808
and its maximum value is +9223372036854775807
.
Using an integer value out of this range result in an error.
Note
Due to bug #56
the current minimum value that can be parsed is actually
-9223372036854775807
.
An int
value is converted to a number in JSON.
Note
Some implementations of JSON use 64-bit floating point number for all
numerical values. Therefore, they might not be able to handle integers
greater than or equal to 9007199254740992 (i.e. 2^53
) accurately.
float
¶
The type float
is a 64-bit floating point type. Its implementation is
IEEE 754 on most platforms but some platforms could use other implementations.
A float
value is converted to a number in JSON.
Note
Some expressions and functions may result in an infinity or a NaN.
Because JSON doesn’t have an infinity or a NaN notation, they will become
null
when they are converted to JSON.
string
¶
The type string
is similar to SQL’s type text
. It may contain an
arbitrary length of characters. It may contain any valid UTF-8 character including a
null character.
A string
value is converted to a string in JSON.
blob
¶
The type blob
is a data type for any variable length binary data. There is no
way to write a value directly in BQL yet, but there are some ways to use blob
in BQL:
- Emitting a tuple containing a
blob
value from a source - Casting a
string
encoded in base64 toblob
- Calling a function returning a
blob
value
A blob
value is converted to a base64-encoded string in JSON.
timestamp
¶
The type timestamp
has date and time information in UTC. timestamp
only
guarantees precision in microseconds. There is no way to write a value directly
in BQL yet, but there are some ways to use blob
in BQL:
- Emitting a tuple containing a
timestamp
value from a source - Casting a value of a type that is convertible to
timestamp
- Calling a function returning a
timestamp
value
A timestamp
value is converted to a string in RFC3339 format with nanosecond
precision in JSON: "2006-01-02T15:04:05.999999999Z07:00"
. Although the
format can express nanoseconds, timestamp
in BQL only guarantees microsecond
precision as described above.
array
¶
The type array
provides an ordered sequence of values of any type, for example:
[1, "2", 3.4]
An array
value can also contain another array
or map
as a value:
[
[1, "2", 3.4],
[
["4", 5.6, 7],
[true, false, NULL],
{"a": 10}
],
{
"nested_array": [12, 34.5, "67"]
}
]
An array
value is converted to an array in JSON.
map
¶
The type map
represents an unordered set of key-value pairs.
A key needs to be a string
and a value can be of any type:
{
"a": 1,
"b": "2",
"c": 3.4
}
A map
value can contain another map
or array
as its value:
{
"a": {
"aa": 1,
"ab": "2",
"ac": 3.4
},
"b": {
"ba": {"a": 10},
"bb": ["4", 5.6, 7],
"bc": [true, false, NULL]
},
"c": [12, 34.5, "67"]
}
A map
is converted to an object in JSON.
Conversions¶
BQL provides a CAST(value AS type)
operator, or value::type
as syntactic
sugar, that converts the given value to a corresponding value in the given type,
if those types are convertible. For example, CAST(1 AS string)
, or
1::string
, converts an int
value 1
to a string
value and
results in "1"
. Converting to the same type as the value’s type is valid.
For instance, "str"::string
does not do anything and results in "str"
.
The following types are valid for the target type of CAST
operator:
bool
int
float
string
blob
timestamp
Specifying null
, array
, or map
as the target type results in an
error.
This section describes how type conversions work in BQL.
Note
Converting a NULL
value into any type results in NULL
and it is not
explicitly described in the subsections.
To bool
¶
Following types can be converted to bool
:
int
float
string
blob
timestamp
array
map
From int
¶
0
is converted to false
. Other values are converted to true
.
From float
¶
0.0
, -0.0
, and NaN are converted to false
. Other values including
infinity result in true
.
From string
¶
Following values are converted to true
:
"t"
"true"
"y"
"yes"
"on"
"1"
Following values are converted to false
:
"f"
"false"
"n"
"no"
"off"
"0"
Comparison is case-insensitive and leading and trailing whitespaces in a value
are ignored. For example, " tRuE "::bool
is true
. Converting a value
that is not mentioned above results in an error.
From blob
¶
An empty blob
value is converted to false
. Other values are converted
to true
.
From timestamp
¶
January 1, year 1, 00:00:00 UTC is converted to false
. Other values are
converted to true
.
From array
¶
An empty array
is converted to false
. Other values result in true
.
From map
¶
An empty map
is converted to false
. Other values result in true
.
To int
¶
Following types can be converted to int
:
bool
float
string
timestamp
From bool
¶
true::int
results in 1 and false::int
results in 0.
From float
¶
Converting a float
value into an int
value truncates the decimal part.
That is, for positive numbers it results in the greatest int
value less than
or equal to the float
value, for negative numbers it results in the smallest
int
value greater than or equal to the float
value:
1.0::int -- => 1
1.4::int -- => 1
1.5::int -- => 1
2.01::int -- => 2
(-1.0)::int -- => -1
(-1.4)::int -- => -1
(-1.5)::int -- => -1
(-2.01)::int -- => -2
The conversion results in an error when the float
value is out of the valid
range of int
values.
From string
¶
When converting a string
value into an int
value, CAST
operator
tries to parse it as an integer value. If the string contains a float
-shaped
value (even if it is "1.0"
), conversion fails.
"1"::int -- => 1
The conversion results in an error when the string
value contains a
number that is out of the valid range of int
values, or the value isn’t a
number. For example, "1a"::string
results in an error even though the value
starts with a number.
From timestamp
¶
A timestamp
value is converted to an int
value as the number of
full seconds elapsed since January 1, 1970 UTC:
("1970-01-01T00:00:00Z"::timestamp)::int -- => 0
("1970-01-01T00:00:00.123456Z"::timestamp)::int -- => 0
("1970-01-01T00:00:01Z"::timestamp)::int -- => 1
("1970-01-02T00:00:00Z"::timestamp)::int -- => 86400
("2016-01-18T09:22:40.123456Z"::timestamp)::int -- => 1453108960
To float
¶
Following types can be converted to float
:
bool
int
string
timestamp
From bool
¶
true::float
results in 1.0 and false::float
results in 0.0.
From int
¶
int
values are converted to the nearest float
values:
1::float -- => 1.0
((9000000000000012345::float)::int)::string -- => "9000000000000012288"
From string
¶
A string
value is parsed and converted to the nearest float
value:
"1.1"::float -- => 1.1
"1e-1"::float -- => 0.1
"-1e+1"::float -- => -10.0
From timestamp
¶
A timestamp
value is converted to a float
value as the number of
seconds (including a decimal part) elapsed since January 1, 1970 UTC. The integral
part of the result contains seconds and the decimal part contains microseconds:
("1970-01-01T00:00:00Z"::timestamp)::float -- => 0.0
("1970-01-01T00:00:00.000001Z"::timestamp)::float -- => 0.000001
("1970-01-02T00:00:00.000001Z"::timestamp)::float -- => 86400.000001
To string
¶
Following types can be converted to string
:
bool
int
float
blob
timestamp
array
map
From bool
¶
true::string
results in "true"
, false::string
results in "false"
.
Note
Keep in mind that casting the string "false"
back to boolean
results in the true
value as described above.
From int
¶
A int
value is formatted as a signed decimal integer:
1::string -- => "1"
(-24)::string -- => "-24"
From float
¶
A float
value is formatted as a signed decimal floating point. Scientific
notation is used when necessary:
1.2::string -- => "1.2"
10000000000.0::string -- => "1e+10"
From blob
¶
A blob
value is converted to a string
value encoded in base64.
Note
Keep in mind that the blob
/string
conversion using CAST
always
involves base64 encoding/decoding. It is not possible to see the single
bytes of a blob
using only the CAST
operator. If there is a
source that emits blob
data where it is known that this is actually
a valid UTF-8 string (for example, JSON or XML data), the interpretation
“as a string” (as opposed to “to string”) must be performed by a UDF.
From timestamp
¶
A timestamp
value is formatted in RFC3339 format with nanosecond precision:
"2006-01-02T15:04:05.999999999Z07:00"
.
From array
¶
An array
value is formatted as a JSON array:
[1, "2", 3.4]::string -- => "[1,""2"",3.4]"
From map
¶
A map
value is formatted as a JSON object:
{"a": 1, "b": "2", "c": 3.4}::string -- => "{""a"":1,""b"":""2"",""c"":3.4}"
To timestamp
¶
Following types can be converted to timestamp
:
int
float
string
From int
¶
An int
value to be converted to a timestamp
value is assumed to have
the number of seconds elapsed since January 1, 1970 UTC:
0::timestamp -- => 1970-01-01T00:00:00Z
1::timestamp -- => 1970-01-01T00:00:01Z
1453108960::timestamp -- => 2016-01-18T09:22:40Z
From float
¶
An float
value to be converted to a timestamp
value is assumed to have
the number of seconds elapsed since January 1, 1970 UTC. Its integral
part should have seconds and decimal part should have microseconds:
0.0::timestamp -- => 1970-01-01T00:00:00Z
0.000001::timestamp -- => 1970-01-01T00:00:00.000001Z
86400.000001::timestamp -- => 1970-01-02T00:00:00.000001Z
From string
¶
A string
value is parsed in RFC3339 format, or RFC3339 with nanosecond
precision format:
"1970-01-01T00:00:00Z"::timestamp -- => 1970-01-01T00:00:00Z
"1970-01-01T00:00:00.000001Z"::timestamp -- => 1970-01-01T00:00:00.000001Z
"1970-01-02T00:00:00.000001Z"::timestamp -- => 1970-01-02T00:00:00.000001Z
Converting ill-formed string
values to timestamp
results in an error.
Operators¶
This chapter introduces operators used in BQL.
Arithmetic Operators¶
BQL provides the following arithmetic operators:
Operator | Description | Example | Result |
---|---|---|---|
+ |
Addition | 6 + 1 |
7 |
- |
Subtraction | 6 - 1 |
5 |
+ |
Unary plus | +4 |
4 |
- |
Unary minus | -4 |
-4 |
* |
Multiplication | 3 * 2 |
6 |
/ |
Division | 7 / 2 |
3 |
% |
Modulo | 5 % 3 |
2 |
All operators accept both integers and floating point numbers. Integers and
floating point numbers can be mixed in a single arithmetic expression. For
example, 3 + 5 * 2.5
is valid.
Note
Unary minus operators can be applied to a value multiple times. However,
each unary minus operators must be separated by a space like - - -3
because --
and succeeding characters are parsed as a comment. For
example, ---3
is parsed as --
and a comment body -3
.
String Operators¶
BQL provides the following string operators:
Operator | Description | Example | Result |
---|---|---|---|
|| |
Concatenation | "Hello" || ", world" |
"Hello, world" |
||
only accepts strings and NULL
. For example, "1" || 2
results in
an error. When one operand is NULL
, the result is also NULL
.
For instance, NULL || "str"
, "str" || NULL
, and NULL || NULL
result
in NULL
.
Comparison Operators¶
BQL provides the following comparison operators:
Operator | Description | Example | Result |
---|---|---|---|
< |
Less than | 1 < 2 |
true |
> |
Greater than | 1 > 2 |
false |
<= |
Less than or equal to | 1 <= 2 |
true |
>= |
Greater than or equal to | 1 >= 2 |
false |
= |
Equal to | 1 = 2 |
false |
<> or != |
Not equal to | 1 != 2 |
true |
IS NULL |
Null check | false IS NULL |
false |
IS NOT NULL |
Non-null check | false IS NOT NULL |
true |
All comparison operators return a boolean value.
<
, >
, <=
, and >=
are only valid when
- either both operands are numeric values (i.e. integers or floating point numbers)
- or have the same type and that type is comparable.
The following types are comparable:
null
int
float
string
timestamp
Valid examples are as follows:
1 < 2.1
- Integers and floating point numbers can be compared.
"abc" > "def"
1::timestamp <= 2::timestamp
NULL > "a"
- This expression is valid although it always results in
NULL
. See NULL Comparison below.
- This expression is valid although it always results in
=
, <>
, and !=
are valid for any type even if both operands have
different types. When the types of operands are different, =
results in
false
; <>
and !=
return true
. (However, integers and floating point
numbers can be compared, for example 1 = 1.0
returns true
.) When
operands have the same type, =
results in true
if both values are
equivalent and others return false
.
Note
Floating point values with the value NaN
are treated specially
as per the underlying floating point implementation. In particular,
=
comparison will always be false if one or both of the operands
is NaN
.
NULL
Comparison¶
In a three-valued logic, comparing any value with NULL
results in NULL
.
For example, all of following expressions result in NULL
:
1 < NULL
2 > NULL
"a" <= NULL
3 = NULL
NULL = NULL
NULL <> NULL
Therefore, do not look for NULL
values with expression = NULL
.
To check if a value is NULL
or not, use IS NULL
or IS NOT NULL
operator. expression IS NULL
operator returns true
only when an
expression is NULL
.
Note
[NULL] = [NULL]
and {"a": NULL} = {"a": NULL}
result in true
although it contradict the three-valued logic. This specification is
provided for convenience. Arrays or maps often have NULL
to indicate
that there’s no value for a specific key but the key actually exists. In
other words, {"a": NULL, "b": 1}
and {"b": 1}
are different.
Therefore, NULL
in arrays and maps are compared as if it’s a regular
value. Unlike NULL
, comparing NaN
floating point values
always results in false
.
Presence/Absence Check¶
In BQL, the JSON object {"a": 6, "b": NULL}
is different from {"a": 6}
.
Therefore, when accessing b
in the latter object, the result is not
NULL
but an error. To check whether a key is present in a map, the following
operators can be used:
Operator | Description | Example | Example Input | Result |
---|---|---|---|---|
IS MISSING |
Absence Check | b IS MISSING |
{"a": 6} |
true |
IS NOT MISSING |
Presence Check | b IS NOT MISSING |
{"a": 6} |
false |
Since the presence/absence check is done before the value is actually
extracted from the map, only JSON Path expressions can be used with
IS [NOT] MISSING
, not arbitrary expressions. For example,
a + 2 IS MISSING
is not a valid expression.
Logical Operators¶
BQL provides the following logical operators:
Operator | Description | Example | Result |
---|---|---|---|
AND |
Logical and | 1 < 2 AND 2 < 3 |
true |
OR |
Logical or | 1 < 2 OR 2 > 3 |
true |
NOT |
Logical negation | NOT 1 < 2 |
false |
Logical operators also follow the three-valued logic. For example,
true AND NULL
and NULL OR false
result in NULL
.
Functions¶
BQL provides a number of built-in functions that are described in this chapter. Function names and meaning of parameters have been heavily inspired by PostgreSQL. However, be aware that the accepted and returned types may differ as there is no simple mapping between BQL and SQL data types. See the Function Reference for details about each function’s behavior.
Numeric Functions¶
General Functions¶
The table below shows some common mathematical functions that can be used in BQL.
Function | Description |
---|---|
abs(x) | absolute value |
cbrt(x) | cube root |
ceil(x) | round up to nearest integer |
degrees(x) | radians to degrees |
div(y, x) | integer quotient of y /x |
exp(x) | exponential |
floor(x) | round down to nearest integer |
ln(x) | natural logarithm |
log(x) | base 10 logarithm |
log(b, x) | logarithm to base b |
mod(y, x) | remainder of y /x |
pi() | “π” constant |
power(a, b) | a raised to the power of b |
radians(x) | degrees to radians |
round(x) | round to nearest integer |
sign(x) | sign of the argument (-1, 0, +1) |
sqrt(x) | square root |
trunc(x) | truncate toward zero |
width_bucket(x, l, r, c) | bucket of x in a histogram |
Pseudo-Random Functions¶
The table below shows functions for generating pseudo-random numbers.
Function | Description |
---|---|
random() | random value in the range \(0.0 <= x < 1.0\) |
setseed(x) | set seed (\(-1.0 <= x <= 1.0\)) for subsequent random() calls |
String Functions¶
The table below shows some common functions for strings that can be used in BQL.
Function | Description |
---|---|
bit_length(s) | number of bits in string |
btrim(s) | remove whitespace from the start/end of s |
btrim(s, chars) | remove chars from the start/end of s |
char_length(s) | number of characters in s |
concat(s [, ...]) | concatenate all arguments |
concat_ws(sep, s [, ...]) | concatenate arguments s with separator |
format(s, [x, ...]) | format arguments using a format string |
lower(s) | convert s to lower case |
ltrim(s) | remove whitespace from the start of s |
ltrim(s, chars) | remove chars from the start of s |
md5(s) | MD5 hash of s |
octet_length(s) | number of bytes in s |
overlay(s, r, from) | replace substring |
overlay(s, r, from, for) | replace substring |
rtrim(s) | remove whitespace from the end of s |
rtrim(s, chars) | remove chars from the end of s |
sha1(s) | SHA1 hash of s |
sha256(s) | SHA256 hash of s |
strpos(s, t) | location of substring t in s |
substring(s, r) | extract substring matching regex r from s |
substring(s, from) | extract substring |
substring(s, from, for) | extract substring |
upper(s) | convert s to upper case |
Time Functions¶
Function | Description |
---|---|
distance_us(u, v) | signed temporal distance from u to v in microseconds |
clock_timestamp() | current date and time (changes during statement execution) |
now() | date and time when processing of current tuple was started |
Array Functions¶
Function | Description |
---|---|
array_length(a) | number of elements in an array |
Other Scalar Functions¶
Function | Description |
---|---|
coalesce(x [, ...]) | return first non-null input parameter |
Aggregate Functions¶
Aggregate functions compute a single result from a set of input values. The built-in normal aggregate functions are listed in the table below. The special syntax considerations for aggregate functions are explained in Aggregate Expressions.
Function | Description |
---|---|
array_agg(x) | input values, including nulls, concatenated into an array |
avg(x) | the average (arithmetic mean) of all input values |
bool_and(x) | true if all input values are true, otherwise false |
bool_or(x) | true if at least one input value is true, otherwise false |
count(x) | number of input rows for which x is not null |
count(*) | number of input rows |
json_object_agg(k, v) | aggregates name/value pairs as a map |
max(x) | maximum value of x across all input values |
median(x) | the median of all input values |
min(x) | minimum value of x across all input values |
string_agg(x, sep) | input values concatenated into a string, separated by sep |
sum(x) | sum of x across all input values |
Server Programming¶
This part describes the extensibility of the SensorBee server. Topics covered in this part are advanced and should be read after understanding the basics of SensorBee and BQL.
Because SensorBee is mainly written in
The Go Programming Language, understanding the language
before reading this part is also recommended.
A Tour of Go is an official tutorial and is one
of the best tutorials of the language. It runs on web browsers and does not
require any additional software installation. After learning the language,
How to Write Go Code helps to understand
how to use the go
tool and the standard way to develop Go packages and
applications.
This part assumes that the go
tool is installed and the development environment
including Go’s environment variables like GOROOT
or GOPATH
is
appropriately set up. SensorBee requires Go 1.4 or later.
Extending the SensorBee Server and BQL¶
Many features of the server and BQL can be extended by plugins. This chapter describes what types of features can be extended by users. Following chapters in this part describes how to develop those features in specific programming languages.
User-Defined Functions¶
A user-defined function (UDF) is a function that is implemented by a user and registered in the SensorBee server. Once it is registered, it be called from BQL statements:
SELECT RSTREAM my_udf(field) FROM src [RANGE 1 TUPLES];
A UDF behaves just like a built-in function. A UDF can take an arbitrary number
of arguments. Each argument can be any of built-in types
and can receive multiple types of values. A UDF can also support a variable-length
argument. A UDF has a single return value of any built-in type. When multiple return
values are required, a UDF can return the value as an array
.
Note
BQL currently does not support CREATE FUNCTION
statements like well-known
RDBMSs. UDFs can only be added through Go programs.
User-Defined Aggregate Functions¶
A user-defined aggregate function (UDAF) is a function similar to a UDF but can take aggregation parameters (see Aggregate Expressions) as arguments in addition to regular arguments.
User-Defined Stream-Generating Functions¶
Stream-generating functions can also be user-defined. There are two types of UDSFs. The first type behaves like a source, which is not connected to any input stream and generates tuples proactively:
... FROM my_counter() [RANGE ...
my_counter
above may emit tuples like {"count": 1}
.
This type of UDSFs are called source-like UDSFs.
The other type is called a stream-like UDSF and behaves just like a stream, which receives tuples from one or more incoming streams or sources. It receives names of streams or sources as its arguments:
... FROM my_udsf("another_stream", "yet_another_stream", other_params) [RANGE ...
Note that there is no rule on how to define UDFS’s arguments. Thus, the order and
the use of arguments depend on each UDFS. For example, a UDFS might take an
array
of string
containing names of input streams as its first argument:
my_union(["stream1", "stream2", "stream3"])
. Names of input stream do not
even need to be located at the beginning of the argument list:
my_udfs2(1, "another_stream")
is also possible.
Using UDSFs is a very powerful way to extend BQL since they can potentially do
anything that the SELECT
cannot do.
User-Defined States¶
A user-defined state (UDS) can be provided to support stateful data processing
(see Stateful Data Processing). A UDS is usually provided with a set of UDFs that
manipulate the state. Those UDFs take the name of the UDS as a string
argument:
CREATE STATE event_id_seq TYPE snowflake_id WITH machine_id = 1;
CREATE STREAM events_with_id AS
SELECT snowflake_id("event_id_seq"), * FROM events [RANGE 1 TUPLES];
In the example above, a UDS event_id_seq
is created with the type
snowflake_id
. Then, the UDS is passed to the UDF snowflake_id
, which
happens to have the same name as the type name of the UDS. The UDF looks up
the UDS event_id_seq
and returns a value computed based on the state.
Source Plugins¶
A source type developed by a user can be added to the SensorBee server as a plugin
so that it can be used in CREATE SOURCE
statements. A source type can have any
number of required and optional parameters. Each parameter can have any of
built-in types.
Sink Plugins¶
A sink type developed by a user can be added to the SensorBee server as a plugin
so that it can be used in CREATE SINK
statement. A sink type can have any
number of required and optional parameters. Each parameter can have any of
built-in types.
Extensions in Go¶
This chapter describes how to extend the SensorBee server in the Go programming language.
Development Flow of Components in Go¶
The typical development flow of components like a UDF, a UDS type, a source type, or a sink type should be discussed before looking into details of each component.
The basic flow is as follows:
- Create a git repository for components
- Implement components
- Create a plugin subpackage in the repository
Create a Git Repository for Components¶
Components are written in Go, so they need to be in a valid git repository (or a repository of a different version control system). One repository may provide multiple types of components. For example, a repository could have 10 UDFs, and 5 UDS types, 2 source types, and 1 sink type. However, since Go is very well designed to provide packages in a fine-grained manner, each repository should only provide a minimum set of components that are logically related and make sense to be in the same repository.
Implement Components¶
The next step is to implement components. There is no restriction on which standard or 3rd party packages to depend on.
Functions or structs that are to be registered to the SensorBee server need to be referred to by the plugin subpackage, which is described in the next subsection. Thus, names of those symbols need to start with a capital letter.
In this step, components should not be registered to the SensorBee server yet.
Create a Plugin Subpackage in the Repository¶
It is highly recommended that the repository has a separate package (i.e. a
subdirectory) which only registers components to the SensorBee server. There is
usually one file named “plugin.go” in the plugin package and it only contains a
series of registration function calls in init
function. For instance, if the
repository only provides one UDF, the contents of plugin.go
would look
like:
// in github.com/user/myudf/plugin/plugin.go
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"github.com/user/myudf"
)
func init() {
udf.MustRegisterGlobalUDF("my_udf", &myudf.MyUDF{})
}
There are two reasons to have a plugin subpackage separated from the implementation of components. First, by separating them, other Go packages can import the components to use the package as a library without registering them to SensorBee. Second, having a separated plugin package allows a user to register a component with a different name. This is especially useful when names of components conflict each other.
To use the example plugin above, the github.com/user/myudf/plugin
package needs
to be added to the plugin path list of SensorBee.
Repository Organization¶
The typical organization of the repository is
github.com/user/repo
README
: description and the usage of components in the repository.go
files: implementation of componentsplugin/
: a subpackage for the plugin registrationplugin.go
othersubpackages/
: there can be optional subpackages
User-Defined Functions¶
This section describes how to write a UDF in Go. It first shows the basic interface of defining UDFs, and then describes utilities around it, how to develop a UDF in a Go-ish manner, and a complete example.
Implementing a UDF¶
Note
This is a very low-level way to implement a UDF in Go. To learn about an easier way, see Generic UDFs.
Any struct implementing the following interface can be used as a UDF:
type UDF interface {
// Call calls the UDF.
Call(*core.Context, ...data.Value) (data.Value, error)
// Accept checks if the function accepts the given number of arguments
// excluding core.Context.
Accept(arity int) bool
// IsAggregationParameter returns true if the k-th parameter expects
// aggregated values. A UDF with Accept(n) == true is an aggregate
// function if and only if this function returns true for one or more
// values of k in the range 0, ..., n-1.
IsAggregationParameter(k int) bool
}
This interface is defined in the gopkg.in/sensorbee/sensorbee.v0/bql/udf
package.
A UDF can be registered via the RegisterGlobalUDF
or MustRegisterGlobalUDF
functions from the same package. MustRegisterGlobalUDF
is the same as
RegisterGlobalUDF
but panics on failure instead of returning an error. These
functions are usually called from the init
function in the UDF package’s
plugin
subpackage. A typical implementation of a UDF looks as follows:
type MyUDF struct {
...
}
func (m *MyUDF) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
...
}
func (m *MyUDF) Accept(arity int) bool {
...
}
func (m *MyUDF) IsAggregationParameter(k int) bool {
...
}
func init() {
// MyUDF can be used as my_udf in BQL statements.
udf.MustRegisterGlobalUDF("my_udf", &MyUDF{})
}
As it can be inferred from this example, a UDF itself should be stateless since it only registers one instance of a struct as a UDF and it will be shared globally. Stateful data processing can be achieved by the combination of UDFs and UDSs, which is described in User-Defined States.
A UDF needs to implement three methods to satisfy udf.UDF
interface:
Call
, Accept
, and IsAggregationParameter
.
The Call
method receives a *core.Context
and multiple data.Value
as its
arguments. *core.Context
contains the information of the current processing
context. Call
‘s ...data.Value
argument holds the values passed to the UDF.
data.Value
represents a value used in BQL and can be any of built-in
types.
SELECT RSTREAM my_udf(arg1, arg2) FROM stream [RANGE 1 TUPLES];
In this example, arg1
and arg2
are passed to the Call
method:
func (m *MyUDF) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
// When my_udf(arg1, arg2) is called, len(args) is 2.
// args[0] is arg1 and args[1] is arg2.
// It is guaranteed that m.Accept(len(args)) is always true.
}
Because data.Value
is a semi-variant type, the Call
method needs to check
the type of each data.Value
and convert it to a desired type.
The Accept
method verifies if the UDF accepts the specific number of arguments.
It can return true for multiple arities as long as it can receive the given
number of arguments. If a UDF only accepts two arguments, the method is implemented
as follows:
func (m *MyUDF) Accept(arity int) bool {
return arity == 2
}
When a UDF aims to support variadic parameters (a.k.a. variable-length
arguments) with two required arguments (e.g.
my_udf(arg1, arg2, optional1, optional2, ...)
), the implementation would be:
func (m *MyUDF) Accept(arity int) bool {
return arity >= 2
}
Finally, IsAggregationParameter
returns whether the k-th argument (starting
from 0) is an aggregation parameter. Aggregation parameters are passed as a
data.Array
containing all values of a field in each group.
All of these methods can be called concurrently from multiple goroutines and they must be thread-safe.
The registered UDF is looked up based on its name and the number of argument passed to it.
SELECT RSTREAM my_udf(arg1, arg2) FROM stream [RANGE 1 TUPLES];
In this SELECT
, a UDF having the name my_udf
is looked up first. After
that, its Accept
method is called with 2 and my_udf
is actually selected
if Accept(2)
returned true. IsAggregationParameter
method is
additionally called on each argument to see if the argument needs to be an
aggregation parameter. Then, if there is no mismatch, my_udf
is finally
called.
Note
A UDF does not have a schema at the moment, so any error regarding types of arguments will not be reported until the statement calling the UDF actually processes a tuple.
Generic UDFs¶
SensorBee provides a helper function to register a regular Go function as a UDF
without implementing the UDF
interface explicitly.
func Inc(v int) int {
return v + 1
}
This function Inc
can be transformed into a UDF by ConvertGeneric
or MustConvertGeneric
function defined in the
gopkg.in/sensorbee/sensorbee.v0/bql/udf
package. By combining it with
RegisterGlobalUDF
, the Inc
function can easily be registered as a UDF:
func init() {
udf.MustRegisterGlobalUDF("inc", udf.MustConvertGeneric(Inc))
}
So, a complete example of the UDF implementation and registration is as follows:
package inc
import (
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
)
func Inc(v int) int {
return v + 1
}
func init() {
udf.MustRegisterGlobalUDF("inc", udf.MustConvertGeneric(Inc))
}
Note
A UDF implementation and registration should actually be separated to different packages. See Development Flow of Components in Go for details.
Although this approach is handy, there is some small overhead compared to a UDF implemented in the regular way. Most of such overhead comes from type checking and conversions.
Functions passed to ConvertGeneric
need to satisfy some restrictions on
the form of their argument and return value types. Each restriction is described
in the following subsections.
Form of Arguments¶
In terms of valid argument forms, there are some rules to follow:
- A Go function can receive
*core.Context
as the first argument, or can omit it. - A function can have any number of arguments including 0 arguments as long as Go accepts them.
- A function can be variadic with or without non-variadic parameters.
There are basically eight (four times two, whether a function has
*core.Context
or not) forms of arguments (return values are
intentionally omitted for clarity):
Functions receiving no argument in BQL (e.g.
my_udf()
)func(*core.Context)
: A function only receiving*core.Context
func()
: A function having no argument and not receiving*core.Context
, either
Functions having non-variadic arguments but no variadic arguments
func(*core.Context, T1, T2, ..., Tn)
func(T1, T2, ..., Tn)
Functions having variadic arguments but no non-variadic arguments
func(*core.Context, ...T)
func(...T)
Functions having both variadic and non-variadic arguments
func(*core.Context, T1, T2, ..., Tn, ...Tn+1)
func(T1, T2, ..., Tn, ...Tn+1)
Here are some examples of invalid function signatures:
func(T, *core.Context)
:*core.Context
must be the first argument.func(NonSupportedType)
: Only supported types, which will be explained later, can be used.
Although return values are omitted from all the examples above, they are actually required. The next subsection explains how to define valid return values.
Form of Return Values¶
All functions need to have return values. There are two forms of return values:
func(...) R
func(...) (R, error)
All other forms are invalid:
func(...)
func(...) error
func(...) NonSupportedType
Valid types of return values are same as the valid types of arguments, and they are listed in the following subsection.
Valid Value Types¶
The list of Go types that can be used for arguments and the return value is as follows:
bool
- signed integers:
int
,int8
,int16
,int32
,int64
- unsigned integers:
uint
,uint8
,uint16
,uint32
,uint64
float32
,float64
string
time.Time
- data:
data.Bool
,data.Int
,data.Float
,data.String
,data.Blob
,data.Timestamp
,data.Array
,data.Map
,data.Value
- A slice of any type above, including
data.Value
data.Value
can be used as a semi-variant type, which will receive all types
above.
When the argument type and the actual value type are different, weak type
conversion are applied to values. Conversions are basically done by
data.ToXXX
functions (see godoc comments of each function in
data/type_conversions.go
). For example, func inc(i int) int
can be called by
inc("3")
in a BQL statement and it will return 4. If a strict type checking
or custom type conversion is required, receive values as data.Value
and
manually check or convert types, or define the UDF in the regular way.
Examples of Valid Go Functions¶
The following functions can be converted to UDFs by ConvertGeneric
or
MustConvertGeneric
function:
func rand() int
func pow(*core.Context, float32, float32) (float32, error)
func join(*core.Context, ...string) string
func format(string, ...data.Value) (string, error)
func keys(data.Map) []string
Complete Examples¶
This subsection shows three example UDFs:
my_inc
my_join
my_join2
Assume that these are in the repository github.com/sensorbee/examples/udfs
(which actually does not exist). The repository has three files:
inc.go
join.go
plugin/plugin.go
inc.go¶
In inc.go
, the Inc
function is defined as a pure Go function with a standard
value type:
package udfs
func Inc(v int) int {
return v + 1
}
join.go¶
In join.go
, the Join
UDF is defined in a strict way. It also performs
strict type checking. It is designed to be called in one of two forms:
my_join("a", "b", "c", "separator")
or
my_join(["a", "b", "c"], "separator")
. Each argument and value in the array
must be a string. The UDF receives an arbitrary number of arguments.
package udfs
import (
"errors"
"strings"
"pfi/sensorbee/sensorbee/core"
"pfi/sensorbee/sensorbee/data"
)
type Join struct {
}
func (j *Join) Call(ctx *core.Context, args ...data.Value) (data.Value, error) {
empty := data.String("")
if len(args) == 1 {
return empty, nil
}
switch args[0].Type() {
case data.TypeString: // my_join("a", "b", "c", "sep") form
var ss []string
for _, v := range args {
s, err := data.AsString(v)
if err != nil {
return empty, err
}
ss = append(ss, s)
}
return data.String(strings.Join(ss[:len(ss)-1], ss[len(ss)-1])), nil
case data.TypeArray: // my_join(["a", "b", "c"], "sep") form
if len(args) != 2 {
return empty, errors.New("wrong number of arguments for my_join(array, sep)")
}
sep, err := data.AsString(args[1])
if err != nil {
return empty, err
}
a, _ := data.AsArray(args[0])
var ss []string
for _, v := range a {
s, err := data.AsString(v)
if err != nil {
return empty, err
}
ss = append(ss, s)
}
return data.String(strings.Join(ss, sep)), nil
default:
return empty, errors.New("the first argument must be a string or an array")
}
}
func (j *Join) Accept(arity int) bool {
return arity >= 1
}
func (j *Join) IsAggregationParameter(k int) bool {
return false
}
plugin/plugin.go¶
In addition to Inc
and Join
, this file registers the standard Go
function strings.Join
as my_join2
. Because it’s converted to a UDF by
udf.MustConvertGeneric
, arguments are weakly converted to given types.
For example, my_join2([1, 2.3, "4"], "-")
is valid although strings.Join
itself is func([]string, string) string
.
package plugin
import (
"strings"
"pfi/sensorbee/sensorbee/bql/udf"
"pfi/nobu/docexamples/udfs"
)
func init() {
udf.MustRegisterGlobalUDF("my_inc", udf.MustConvertGeneric(udfs.Inc))
udf.MustRegisterGlobalUDF("my_join", &udfs.Join{})
udf.MustRegisterGlobalUDF("my_join2", udf.MustConvertGeneric(strings.Join))
}
Evaluating Examples¶
Once the sensorbee
command is built with those UDFs and a topology is
created on the server, the EVAL
statement can be used to test them:
EVAL my_inc(1); -- => 2
EVAL my_inc(1.5); -- => 2
EVAL my_inc("10"); -- => 11
EVAL my_join("a", "b", "c", "-"); -- => "a-b-c"
EVAL my_join(["a", "b", "c"], ",") -- => "a,b,c"
EVAL my_join(1, "b", "c", "-") -- => error
EVAL my_join([1, "b", "c"], ",") -- => error
EVAL my_join2(["a", "b", "c"], ",") -- => "a,b,c"
EVAL my_join2([1, "b", "c"], ",") -- => "1,b,c"
Dynamic Loading¶
Dynamic loading of UDFs written in Go is not supported at the moment because Go does not support loading packages dynamically.
User-Defined Stream-Generating Functions¶
This section describes how to write a UDSF in Go.
Implementing a UDSF¶
To provide a UDSF, two interfaces need to be implemented: UDSF
and
UDSFCreator
.
The interface UDSF
is defined as follows in the
gopkg.in/sensorbee/sensorbee.v0/bql/udf
package.
type UDSF interface {
Process(ctx *core.Context, t *core.Tuple, w core.Writer) error
Terminate(ctx *core.Context) error
}
The Process
method processes an input tuple and emits computed tuples for
subsequent streams. ctx
contains the processing context information. t
is the tuple to be processed in the UDSF. w
is the destination to where
computed tuples are emitted. The Terminate
method is called when the UDFS
becomes unnecessary. The method has to release all the resources the UDSF has.
How the Process
method is called depends on the type of a UDSF. When a UDFS
is a stream-like UDSF (i.e. it has input from other streams), the Process
method is called every time a new tuple arrives. The argument t
contains the
tuple emitted from another stream. Stream-like UDSFs have to return from
Process
immediately after processing the input tuple. They must not
block in the method.
A stream-like UDSF is used mostly when multiple tuples need to be computed and emitted based on one input tuple:
type WordSplitter struct {
field string
}
func (w *WordSplitter) Process(ctx *core.Context, t *core.Tuple, writer core.Writer) error {
var kwd []string
if v, ok := t.Data[w.field]; !ok {
return fmt.Errorf("the tuple doesn't have the required field: %v", w.field)
} else if s, err := data.AsString(v); err != nil {
return fmt.Errorf("'%v' field must be string: %v", w.field, err)
} else {
kwd = strings.Split(s, " ")
}
for _, k := range kwd {
out := t.Copy()
out.Data[w.field] = data.String(k)
if err := writer.Write(ctx, out); err != nil {
return err
}
}
return nil
}
func (w *WordSplitter) Terminate(ctx *core.Context) error {
return nil
}
WordSplitter
splits text in a specific field by space. For example, when
an input tuple is {"word": "a b c"}
and WordSplitter.Field
is word
,
following three tuples will be emitted: {"word": "a"}
, {"word": "b"}
,
and {"word": "c"}
.
When a UDSF is a source-like UDSF, the Process
method is only called once
with a tuple that does not mean anything. Unlike a stream-like UDSF, the
Process
method of a source-like UDSF does not have to return until it has
emitted all tuples, the Terminate
method is called, or a fatal error occurs.
type Ticker struct {
interval time.Duration
stopped int32
}
func (t *Ticker) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
var i int64
for ; atomic.LoadInt32(&t.stopped) == 0; i++ {
newTuple := core.NewTuple(data.Map{"tick": data.Int(i)})
if err := w.Write(ctx, newTuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Terminate(ctx *core.Context) error {
atomic.StoreInt32(&t.stopped, 1)
return nil
}
In this example, Ticker
emits tuples having tick
field containing
a counter until the Terminate
method is called.
Whether a UDSF is stream-like or source-like can be configured when it is
created by UDSFCreator
. The interface UDSFCreator
is defined as follows
in gopkg.in/sensorbee/sensorbee.v0/bql/udf
package:
type UDSFCreator interface {
CreateUDSF(ctx *core.Context, decl UDSFDeclarer, args ...data.Value) (UDSF, error)
Accept(arity int) bool
}
The CreateUDSF
method creates a new instance of a UDSF. The method is called
when evaluating a UDSF in the FROM
clause of a SELECT
statement. ctx
contains the processing context information. decl
is used to customize the
behavior of the UDSF, which is explained later. args
has arguments passed in the
SELECT
statement. The Accept
method verifies if the UDSF accept the
specific number of arguments. This is the same as UDF.Arity
method (see
User-Defined Functions).
UDSFDeclarer
is used in the CreateUDSF
method to customize the
behavior of a UDSF:
type UDSFDeclarer interface {
Input(name string, config *UDSFInputConfig) error
ListInputs() map[string]*UDSFInputConfig
}
By calling its Input
method, a UDSF will be able to receive tuples from
another stream with the name name
. Because the name
is given outside the
UDSF, it’s uncontrollable from the UDSF. However, there are cases that a UDSF
wants to know from which stream a tuple has come. For example, when providing
a UDSF performing a JOIN or two streams, a UDSF needs to distinguish which
stream emitted the tuple. If the UDSF was defined as
my_join(left_stream, right_stream)
, decl
can be used as follows in
UDSFCreator.CreateUDSF
:
decl.Input(args[0], &UDSFInputConfig{InputName: "left"})
decl.Input(args[1], &UDSFInputConfig{InputName: "right"})
By configuring the input stream in this way, a tuple passed to UDSF.Process
has
the given name in its Tuple.InputName
field:
func (m *MyJoin) Process(ctx *core.Context, t *core.Tuple, w core.Writer) error {
switch t.InputName {
case "left":
... process tuples from left_stream ...
case "right":
... process tuples from right_stream ...
}
...
}
If a UDSF is configured to have one or more input streams by decl.Input
in
the UDSFCreator.CreateUDSF
method, the UDSF is processed as a stream-like
UDSF. Otherwise, if a UDSF doesn’t have any input (i.e. decl.Input
is not
called), the UDSF becomes a source-like UDSF.
As an example, the UDSFCreator
of WordSplitter
is shown below:
type WordSplitterCreator struct {
}
func (w *WordSplitterCreator) CreateUDSF(ctx *core.Context,
decl udf.UDSFDeclarer, args ...data.Value) (udf.UDSF, error) {
input, err := data.AsString(args[0])
if err != nil {
return nil, fmt.Errorf("input stream name must be a string: %v", args[0])
}
field, err := data.AsString(args[1])
if err != nil {
return nil, fmt.Errorf("target field name must be a string: %v", args[1])
}
// This Input call makes the UDSF a stream-like UDSF.
if err := decl.Input(input, nil); err != nil {
return nil, err
}
return &WordSplitter{
field: field,
}, nil
}
func (w *WordSplitterCreator) Accept(arity int) bool {
return arity == 2
}
Although the UDSF has not been registered to the SensorBee server yet, it could
appear like word_splitter(input_stream_name, target_field_name)
if it was
registered with the name word_splitter
.
For another example, the UDSFCreator
of Ticker
is shown below:
type TickerCreator struct {
}
func (t *TickerCreator) CreateUDSF(ctx *core.Context,
decl udf.UDSFDeclarer, args ...data.Value) (udf.UDSF, error) {
interval, err := data.ToDuration(args[0])
if err != nil {
return nil, err
}
// Since this is a source-like UDSF, there's no input.
return &Ticker{
interval: interval,
}, nil
}
func (t *TickerCreator) Accept(arity int) bool {
return arity == 1
}
Like word_splitter
, its signature could be ticker(interval)
if the UDSF
is registered as ticker
.
The implementation of this UDSF is completed and the next step is to register it to the SensorBee server.
Registering a UDSF¶
A UDSF can be used in BQL by registering its UDSFCreator
interface to
the SensorBee server using the RegisterGlobalUDSFCreator
or
MustRegisterGlobalUDSFCreator
functions, which are defined in
gopkg.in/sensorbee/sensorbee.v0/bql/udf
.
The following example registers WordSplitter
and Ticker
:
func init() {
udf.RegisterGlobalUDSFCreator("word_splitter", &WordSplitterCreator{})
udf.RegisterGlobalUDSFCreator("ticker", &TickerCreator{})
}
Generic UDSFs¶
Just like UDFs have a ConvertGeneric
function, UDSFs also have
ConvertToUDSFCreator
and MustConvertToUDSFCreator
function. They convert
a regular function satisfying some restrictions to the UDSFCreator
interface.
The restrictions are the same as for
generic UDFs except that a
function converted to the UDSFCreator
interface has an additional argument
UDSFDeclarer
. UDSFDeclarer
is located after *core.Context
and before
other arguments. Examples of valid function signatures are show below:
func(*core.Context, UDSFDeclarer, int)
func(UDSFDeclarer, string)
func(UDSFDeclarer)
func(*core.Context, UDSFDeclarer, ...data.Value)
func(UDSFDeclarer, ...float64)
func(*core.Context, UDSFDeclarer, int, ...string)
func(UDSFDeclarer, int, float64, ...time.Time)
Unlike *core.Context
, UDSFDeclarer
cannot be omitted. The same set of
types can be used for arguments as types that ConvertGeneric
function
accepts.
WordSplitterCreator
can be rewritten with the ConvertToUDSFCreator
function as follows:
func CreateWordSplitter(decl udf.UDSFDeclarer,
inputStream, field string) (udf.UDSF, error) {
if err := decl.Input(inputStream, nil); err != nil {
return nil, err
}
return &WordSplitter{
field: field,
}, nil
}
func init() {
udf.RegisterGlobalUDSFCreator("word_splitter",
udf.MustConvertToUDSFCreator(WordSplitterCreator))
}
TickerCreator
can be replaced with ConvertToUDSFCreator
, too:
func CreateTicker(decl udf.UDSFDeclarer, i data.Value) (udf.UDSF, error) {
interval, err := data.ToDuration(i)
if err != nil {
return nil, err
}
return &Ticker{
interval: interval,
}, nil
}
func init() {
udf.MustRegisterGlobalUDSFCreator("ticker",
udf.MustConvertToUDSFCreator(udsfs.CreateTicker))
}
A Complete Example¶
This subsection provides a complete example of UDSFs described in this section.
In addition to word_splitter
and ticker
, the example also includes the
lorem
source, which periodically emits random texts as
{"text": "lorem ipsum dolor sit amet"}
.
Assume that the import path of the example repository is
github.com/sensorbee/examples/udsfs
, which doesn’t actually exist. The
repository has four files:
lorem.go
splitter.go
ticker.go
plugin/plugin.go
lorem.go¶
To learn how to implement a source plugin, see Source Plugins.
package udsfs
import (
"math/rand"
"strings"
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
var (
Lorem = strings.Split(strings.Replace(`lorem ipsum dolor sit amet
consectetur adipiscing elit sed do eiusmod tempor incididunt ut labore et dolore
magna aliqua Ut enim ad minim veniam quis nostrud exercitation ullamco laboris
nisi ut aliquip ex ea commodo consequat Duis aute irure dolor in reprehenderit
in voluptate velit esse cillum dolore eu fugiat nulla pariatur Excepteur sint
occaecat cupidatat non proident sunt in culpa qui officia deserunt mollit anim
id est laborum`, "\n", " ", -1), " ")
)
type LoremSource struct {
interval time.Duration
}
func (l *LoremSource) GenerateStream(ctx *core.Context, w core.Writer) error {
for {
var text []string
for l := rand.Intn(5) + 5; l > 0; l-- {
text = append(text, Lorem[rand.Intn(len(Lorem))])
}
t := core.NewTuple(data.Map{
"text": data.String(strings.Join(text, " ")),
})
if err := w.Write(ctx, t); err != nil {
return err
}
time.Sleep(l.interval)
}
}
func (l *LoremSource) Stop(ctx *core.Context) error {
return nil
}
func CreateLoremSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return core.ImplementSourceStop(&LoremSource{
interval: interval,
}), nil
}
splitter.go¶
package udsfs
import (
"fmt"
"strings"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type WordSplitter struct {
field string
}
func (w *WordSplitter) Process(ctx *core.Context,
t *core.Tuple, writer core.Writer) error {
var kwd []string
if v, ok := t.Data[w.field]; !ok {
return fmt.Errorf("the tuple doesn't have the required field: %v", w.field)
} else if s, err := data.AsString(v); err != nil {
return fmt.Errorf("'%v' field must be string: %v", w.field, err)
} else {
kwd = strings.Split(s, " ")
}
for _, k := range kwd {
out := t.Copy()
out.Data[w.field] = data.String(k)
if err := writer.Write(ctx, out); err != nil {
return err
}
}
return nil
}
func (w *WordSplitter) Terminate(ctx *core.Context) error {
return nil
}
func CreateWordSplitter(decl udf.UDSFDeclarer,
inputStream, field string) (udf.UDSF, error) {
if err := decl.Input(inputStream, nil); err != nil {
return nil, err
}
return &WordSplitter{
field: field,
}, nil
}
ticker.go¶
package udsfs
import (
"sync/atomic"
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Ticker struct {
interval time.Duration
stopped int32
}
func (t *Ticker) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
var i int64
for ; atomic.LoadInt32(&t.stopped) == 0; i++ {
newTuple := core.NewTuple(data.Map{"tick": data.Int(i)})
if err := w.Write(ctx, newTuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Terminate(ctx *core.Context) error {
atomic.StoreInt32(&t.stopped, 1)
return nil
}
func CreateTicker(decl udf.UDSFDeclarer, i data.Value) (udf.UDSF, error) {
interval, err := data.ToDuration(i)
if err != nil {
return nil, err
}
return &Ticker{
interval: interval,
}, nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"github.com/sensorbee/examples/udsfs"
)
func init() {
bql.MustRegisterGlobalSourceCreator("lorem",
bql.SourceCreatorFunc(udsfs.CreateLoremSource))
udf.MustRegisterGlobalUDSFCreator("word_splitter",
udf.MustConvertToUDSFCreator(udsfs.CreateWordSplitter))
udf.MustRegisterGlobalUDSFCreator("ticker",
udf.MustConvertToUDSFCreator(udsfs.CreateTicker))
}
Example BQL Statements¶
CREATE SOURCE lorem TYPE lorem;
CREATE STREAM lorem_words AS
SELECT RSTREAM * FROM word_splitter("lorem", "text") [RANGE 1 TUPLES];
Results of word_splitter
can be received by the following SELECT
:
SELECT RSTREAM * FROM lorem_words [RANGE 1 TUPLES];
User-Defined States¶
This section describes how to write a UDS in Go.
Implementing a UDS¶
A struct implementing the following interface can be used as a UDS:
type SharedState interface {
Terminate(ctx *Context) error
}
This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core
package.
Terminate
method is called when the UDS becomes no longer in use. It should
release any resource that the UDS has allocated so far.
As an example, a UDS having a monotonically increasing counter can be implemented as follows:
type Counter struct {
c int64
}
func (c *Counter) Terminate(ctx *core.Context) error {
return nil
}
func (c *Counter) Next() int64 {
return atomic.AddInt64(&c.c, 1)
}
At the moment, there’s no way to manipulate the UDS from BQL statements. UDSs are usually provided with a set of UDFs that read or update the UDS. It’ll be described later in Manipulating a UDS via a UDF. Before looking into the UDS manipulation, registering and creating a UDS needs to be explained.
Registering a UDS¶
To register a UDS to the SensorBee server , the UDS needs to provide its
UDSCreator
. UDSCreator
is an interface defined in
gopkg.in/sensorbee/sensorbee.v0/bql/udf
package as follows:
type UDSCreator interface {
CreateState(ctx *core.Context, params data.Map) (core.SharedState, error)
}
UDSCreator.CreateState
method is called when executing a CREATE STATE
statement. The method creates a new instance of the UDS and initializes it with
the given parameters. The argument ctx
has the processing context
information and params
has parameters specified in the WITH
clause of
the CREATE STATE
.
The creator can be registered by RegisterGlobalUDSCreator
or
MustRegisterGlobalUDSCreator
function defined in
gopkg.in/sensorbee/sensorbee.v0/bql/udf
package.
The following is the implementation and the registration of the creator for
Counter
UDS above:
type CounterCreator struct {
}
func (c *CounterCreator) CreateState(ctx *core.Context,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if v, ok := params["start"]; ok {
i, err := data.ToInt(v)
if err != nil {
return nil, err
}
cnt.c = i - 1
}
return cnt, nil
}
func init() {
udf.MustRegisterGlobalUDSCreator("my_counter", &CounterCreator{})
}
The creator in this example is registered with the UDS type name my_counter
.
The creator supports start
parameter which is used as the first value that
Counter.Next
returns. The parameter can be specified in the CREATE STATE
as follows:
CREATE STATE my_counter_instance TYPE my_counter WITH start = 100;
Because the creator creates a new instance every time the CREATE STATE
is
executed, there can be multiple instances of a specific UDS type:
CREATE STATE my_counter_instance1 TYPE my_counter;
CREATE STATE my_counter_instance2 TYPE my_counter;
CREATE STATE my_counter_instance3 TYPE my_counter;
...
Once an instance of the UDS is created by the CREATE STATE
, UDFs can refer
them and manipulate their state.
udf.UDSCreatorFunc
¶
A function having the same signature as UDSCreator.CreateState
can be
converted into UDSCreator
by by udf.UDSCreatorFunc
utility function:
func UDSCreatorFunc(f func(*core.Context, data.Map) (core.SharedState, error)) UDSCreator
For example, CounterCreator
can be defined as a function and registered as
follows with this utility:
func CreateCounter(ctx *core.Context,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if v, ok := params["start"]; ok {
i, err := data.ToInt(v)
if err != nil {
return nil, err
}
cnt.c = i - 1
}
return cnt, nil
}
func init() {
udf.MustRegisterGlobalUDSCreator("my_counter",
&udf.UDSCreatorFunc(CreateCounter))
}
To support SAVE STATE
and LOAD STATE
statements, however, this utility
function cannot be used because the creator needs to have the LoadState
method. How to support saving and loading is described later.
Manipulating a UDS via a UDF¶
To manipulate a UDS from BQL statements, a set of UDFs that read or update the UDS has to be provided with it:
func Next(ctx *core.Context, uds string) (int64, error) {
s, err := ctx.SharedStates.Get(uds)
if err != nil {
return 0, err
}
c, ok := s.(*Counter)
if !ok {
return 0, fmt.Errorf("the state isn't a counter: %v", uds)
}
return c.Next(), nil
}
func init() {
udf.MustRegisterGlobalUDF("my_next_count", udf.MustConvertGeneric(Next))
}
In this example, a UDF my_next_count
is registered to the SensorBee server.
The UDF calls Counter.Next
method to obtain the next count and returns it.
The UDF receives one argument uds
that is the name of the UDS to be updated.
CREATE STATE my_counter_instance TYPE my_counter;
CREATE STREAM events_with_id AS
SELECT RSTREAM my_next_count("my_counter_instance") AS id, *
FROM events [RANGE 1 TUPLES];
The BQL statements above add IDs to tuples emitted from a stream events
. The
state my_counter_instance
is created with the type my_counter
. Then,
my_next_count
UDF is called with the name. Every time the UDF is called,
the state of my_counter_instance
is updated by its Next
method.
my_next_count
(i.e. Next
function in Go) can look up the instance of
the UDS by its name through core.Context.SharedStates
. SharedStates
manages all the UDSs created in a topology. SharedState.Get
returns the
instance of the UDS having the given name. It returns an error if it couldn’t
find the instance. In the example above, my_next_count("my_counter_instance")
will look up an instance of the UDS having the name my_counter_instance
,
which was previously created by the CREATE STATE
. The UDS returned from
Get
method has the type core.SharedState
and cannot directly be used as
Counter
. Therefore, it has to be cast to *Counter
.
Since the state can be any type satisfying core.SharedState
, a UDS can
potentially have any information such as machine learning models,
dictionaries for natural language processing, or even an in-memory database.
Note
As UDFs are concurrently called from multiple goroutines, UDSs also needs to be thread-safe.
Saving and Loading a UDS¶
Counter
implemented so far doesn’t support saving and loading its state.
Thus, its count will be reset every time the server restarts. To save the
state and load it later on, the UDS and its creator need to provide some
methods. After providing those method, the state can be saved by the
SAVE STATE
statement and loaded by LOAD STATE
statement.
Supporting SAVE STATE
¶
By adding Save
method having the following signature to a UDS, the UDS can
be saved by the SAVE STATE
statement:
Save(ctx *core.Context, w io.Writer, params data.Map) error
Save
method writes all the data that the state has to w io.Writer
.
The data can be written in any format as long as corresponding loading methods
can reconstruct the state from it. It can be in JSON, msgpack, Protocol Buffer,
and so on.
Warning
Providing forward/backward compatibility or version controlling of the saved data is the responsibility of the author of the UDS.
*core.Context
has the processing context information. params
argument
is not used at the moment and reserved for the future use.
Once Save method is provided, the UDS can be saved by SAVE STATE
statement:
SAVE STATE my_counter_instance;
The SAVE STATE
doesn’t take any parameters now. The location and the
physical format of the saved UDS data depend on the configuration of the
SensorBee server or program running BQL statements. However, it is guaranteed
that the saved data can be loaded by the same program via the LOAD STATE
statement, which is described later.
Save
method of previously implemented Counter
can be as follows:
func (c *Counter) Save(ctx *core.Context, w io.Writer, params data.Map) error {
return binary.Write(w, binary.LittleEndian, atomic.LoadInt64(&c.c))
}
Note
Because this counter is very simple, there’s no version controlling logic in the method. As the minimum solution, having a version number at the beginning of the data is sufficient for most cases.
Supporting LOAD STATE
¶
To support the LOAD STATE
statement, a UDSCreator
needs to have
LoadState
method having the following signature:
LoadState(ctx *core.Context, r io.Reader, params data.Map) (core.SharedState, error)
Note
LoadState
method needs to be defined in a UDSCreator
, not in the
UDS itself.
LoadState
method reads data from r io.Reader
. The data has exactly the
same format as the one previously written by Save
method of a UDS.
params
has parameters specified in the SET
clause in the LOAD STATE
statement.
Note
Parameters specified in the SET
clause doesn’t have to be same as ones
given in the WITH
clause of the CREATE STATE
statement. See
LOAD STATE for details.
When LoadState
method returns an error, the LOAD STATE
statement with
CREATE IF NOT STATE
doesn’t fallback to CREATE STATE
, but it just fails.
Once LoadState
method is added to the UDSCreator
, the saved state can be
loaded by LOAD STATE
statement.
LoadState
method of previously implemented CounterCreator
can be as
follows:
func (c *CounterCreator) LoadState(ctx *core.Context, r io.Reader,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if err := binary.Read(r, binary.LittleEndian, &cnt.c); err != nil {
return nil, err
}
return cnt, nil
}
Providing Load
method in a UDS¶
In addition to implementing LoadState
method in a UDS’s creator, a UDS
itself can provide Load
method. While LoadState
method creates a new
state instance and replace it with the previous instance, Load
method
dynamically modifies the existing instance. Therefore, Load
method can
potentially be more efficient than LoadState
method although it has to
provide appropriate failure handling and concurrency control so that (1) the
UDS doesn’t become invalid on failure (i.e. Load
methods is “exception
safe”) or by concurrent calls, and (2) other operations on the UDS don’t block
for a long time.
The signature of Load
method is almost the same as LoadState
method
except that Load
method doesn’t return a new core.SharedState
but
updates the UDS itself instead:
Load(ctx *Context, r io.Reader, params data.Map) error
Load
method of previously implemented Counter
can be as follows:
func (c *Counter) Load(ctx *core.Context, r io.Reader, params data.Map) error {
var cnt int64
if err := binary.Read(r, binary.LittleEndian, &cnt); err != nil {
return err
}
atomic.StoreInt64(&c.c, cnt)
return nil
}
How Loading is Processed¶
SensorBee tries to use these two loading methods LoadState
and Load
in the following rule:
When a UDS’s creator doesn’t provide
LoadState
method, theLOAD STATE
statement fails.- The
LOAD STATE
statement fails even if the UDS implements itsLoad
method. To support the statement,LoadState
method is always required in its creator. This is becauseLoad
method only works when an instance of the UDS is already created or loaded, and it cannot be used for a nonexistent instance. - The
LOAD STATE CREATE IF NOT SAVED
statement also fails ifLoadState
method isn’t provided. The statement callsCreateState
method when the state hasn’t previously been saved. Otherwise, it’ll try to load the saved data. Therefore, if the data is previously saved and an instance of the UDS hasn’t been created yet, the statement cannot create a new instance withoutLoadState
method in the creator. To be consistent on various conditions, theLOAD STATE CREATE IF NOT SAVED
statement fails ifLoadState
method isn’t provided regardless of whether the state has been saved before or not.
- The
When a UDS’s creator provides
LoadState
method and the UDS doesn’t provideLoad
method, theLOAD STATE
statement tries to load a model throughLoadState
method.- It will create a new instance so that it consumes twice as much memory.
When a UDS’s creator provides
LoadState
method and the UDS also providesLoad
method,Load
method will be used when the instance has already been created or loaded.LoadState
method wouldn’t be used even ifLoad
method failed.
LoadState
method will be used otherwise.
Note
This is already mentioned in the list above, but LoadState
method always
needs to be provided even if a UDS implements Load
method.
A Complete Example¶
A complete example of the state is shown in this subsection. Assume that the
import path of the example repository is
github.com/sensorbee/examples/counter
, which doesn’t actually exist. The
repository has two files:
- counter.go
- plugin/plugin.go
counter.go¶
package counter
import (
"encoding/binary"
"fmt"
"io"
"sync/atomic"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Counter struct {
c int64
}
func (c *Counter) Terminate(ctx *core.Context) error {
return nil
}
func (c *Counter) Next() int64 {
return atomic.AddInt64(&c.c, 1)
}
func (c *Counter) Save(ctx *core.Context, w io.Writer, params data.Map) error {
return binary.Write(w, binary.LittleEndian, atomic.LoadInt64(&c.c))
}
func (c *Counter) Load(ctx *core.Context, r io.Reader, params data.Map) error {
var cnt int64
if err := binary.Read(r, binary.LittleEndian, &cnt); err != nil {
return err
}
atomic.StoreInt64(&c.c, cnt)
return nil
}
type CounterCreator struct {
}
func (c *CounterCreator) CreateState(ctx *core.Context,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if v, ok := params["start"]; ok {
i, err := data.ToInt(v)
if err != nil {
return nil, err
}
cnt.c = i - 1
}
return cnt, nil
}
func (c *CounterCreator) LoadState(ctx *core.Context, r io.Reader,
params data.Map) (core.SharedState, error) {
cnt := &Counter{}
if err := binary.Read(r, binary.LittleEndian, &cnt.c); err != nil {
return nil, err
}
return cnt, nil
}
func Next(ctx *core.Context, uds string) (int64, error) {
s, err := ctx.SharedStates.Get(uds)
if err != nil {
return 0, err
}
c, ok := s.(*Counter)
if !ok {
return 0, fmt.Errorf("the state isn't a counter: %v", uds)
}
return c.Next(), nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"github.com/sensorbee/examples/counter"
)
func init() {
udf.MustRegisterGlobalUDSCreator("my_counter",
&counter.CounterCreator{})
udf.MustRegisterGlobalUDF("my_next_count",
udf.MustConvertGeneric(counter.Next))
}
Writing Tuples to a UDS¶
When a UDS implements core.Writer
, the INSERT INTO
statement can
insert tuples into the UDS via the uds
sink:
type Writer interface {
Write(*Context, *Tuple) error
}
The following is the example of using the uds
sink:
CREATE STATE my_state TYPE my_state_type;
CREATE SINK my_state_sink TYPE uds WITH name = "my_state";
INSERT INTO my_state_sink FROM some_stream;
If my_state_type
doesn’t implement core.Writer
, the CREATE SINK
statement fails. Every time some_stream
emits a tuple, the Write
method of my_state
is called.
Example¶
Models provided by Jubatus machine learning plugin for SensorBee implement
the Write
method. When tuples are inserted into a UDS, it trains the model
it has.
Source Plugins¶
This section describes how to implement a source as a plugin of SensorBee.
Implementing a Source¶
A struct implementing the following interface can be a source:
type Source interface {
GenerateStream(ctx *Context, w Writer) error
Stop(ctx *Context) error
}
This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core
package.
The GenerateStream
methods actually generate tuples for subsequent streams.
The argument ctx
contains the information of the current processing context.
w
is the destination to where generated tuples are emitted. The Stop
method stops GenerateStream
. It should wait until the GenerateStream
method call returns, but it isn’t mandatory.
Once the GenerateStream
method is called, a source can emit as many tuples
as many tuples as it requires. A source basically needs to return from its
GenerateStream
method when:
- it emitted all the tuples it has
- the
Stop
method was called - a fatal error occurred
The Stop
method can be called concurrently while the GenerateStream
method is working and it must be thread-safe. As long as a source is used by
components defined in SensorBee, it’s guaranteed that its Stop
method is
called only once and it doesn’t have to be idempotent. However, it is
recommended that a source provide a termination check in its Stop
method
to avoid a double free problem.
A typical implementation of a source is shown below:
func (s *MySource) GenerateStream(ctx *core.Context, w core.Writer) error {
<initialization>
defer func() {
<clean up>
}()
for <check stop> {
t := <create a new tuple>
if err := w.Write(ctx, t); err != nil {
return err
}
}
return nil
}
func (s *MySource) Stop(ctx *core.Context) error {
<turn on a stop flag>
<wait until GenerateStream stops>
return nil
}
The following example source emits tuple periodically:
type Ticker struct {
interval time.Duration
stopped int32
}
func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
var cnt int64
for ; ; cnt++ {
if atomic.LoadInt32(&t.stopped) != 0 {
break
}
tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
if err := w.Write(ctx, tuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Stop(ctx *core.Context) error {
atomic.StoreInt32(&t.stopped, 1)
return nil
}
The interval
field is initialized in SourceCreator
, which is described
later. This is the source version of the example in UDSF’s section. This
implementation is a little wrong since the Stop
method doesn’t wait until
the GenerateStream
method actually returns. Because implementing a
thread-safe source which stops correctly is a difficult task, core
package
provides a utility function that implements a source’s Stop
method on behalf
of the source itself. See Utilities for
details.
Registering a Source¶
To register a source to the SensorBee server, the source needs to provide its
SourceCreator
. The SourceCreator
interface is defined in
gopkg.in/sensorbee/sensorbee.v0/bql
pacakge as follows:
type SourceCreator interface {
CreateSource(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Source, error)
}
It only has one method: CreateSource
. The CreateSource
method is called
when the CREATE SOURCE statement is executed.
The ctx
argument contains the information of the current processing context.
ioParams
has the name and the type name of the source, which are given in
the CREATE SOURCE
statement. params
has parameters specified in the
WITH
clause of the CREATE SOURCE
statement.
The creator can be registered by RegisterGlobalSourceCreator
or
MustRegisterGlobalSourceCreator
function. As an example, the creator of
Ticker
above can be implemented and registered as follows:
type TickerCreator struct {
}
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return &Ticker{
interval: interval,
}, nil
}
func init() {
bql.MustRegisterGlobalSourceCreator("ticker", &TickerCreator{})
}
In this example, the source has a parameter interval
which can be specified
in the WITH
clause of the CREATE SOURCE
statement:
CREATE SOURCE my_ticker TYPE ticker WITH interval = 0.1;
my_ticker
emits tuples that look like {"tick": 123}
in every 100ms.
Without the interval
parameter, my_ticker
will emit tuples in every one
second by default.
Types of a Source¶
In addition to a regular source, there’re two more types of sources: a resumable source and a rewindable source. This subsection describes those sources in detail.
Resumable Sources¶
A source that supports PAUSE SOURCE and the RESUME SOURCE statements are called a resumable source.
Although all sources support them by default, which is done by the core
package, a source can explicitly implement core.Resumable
interface so that
it can provide more efficient pause and resume capability:
type Resumable interface {
Pause(ctx *Context) error
Resume(ctx *Context) error
}
The Pause
method is called when PAUSE SOURCE
statement is executed and
the Resume
method is called by RESUME SOURCE
. The Pause
method may
be called even when the source is already paused, so is the Resume
method.
A source can be non-resumable by implementing these method to return an error:
type MyNonResumableSource struct {
...
}
...
func (m *MyNonResumableSource) Pause(ctx *core.Context) error {
return errors.New("my_non_resumable_source doesn't support pause")
}
func (m *MyNonResumableSource) Resume(ctx *core.Context) error {
return errors.New("my_non_resumable_source doesn't support resume")
}
Rewindable Sources¶
A rewindable source can re-generate the same tuples again from the beginning after it emits all tuples or while it’s emitting tuples. A rewindable source supports the REWIND SOURCE statement.
A source can become rewindable by implementing the core.RewindableSource
interface:
type RewindableSource interface {
Source
Resumable
Rewind(ctx *Context) error
}
A rewindable source also needs to implement core.Resumable
to be rewindable.
Note
The reason that a rewindable source also needs to be resumable is due to
the internal implementation of the default pause/resume support. While a
source is paused, it blocks core.Writer.Write
called in the
GenerateStream
method. The Rewind
method could also be blocked while
the Write
call is being blocked until the Resume
method is called.
It, of course, depends on the implementation of a source, but it’s very
error-prone. Therefore, implementing the Resumable
interface is required
to be rewindable at the moment.
Unlike a regular source, the GenerateStream
method of a rewindable source
must not return after it emits all tuples. Instead, it needs to wait until the
Rewind
method or the Stop
method is called. Once it returns, the source
is considered stopped and no further operation including the REWIND SOURCE
statement wouldn’t work on the source.
Due to its nature, a stream isn’t often resumable. A resumable source is
mostly used for relatively static data sources such as relations or files.
Also, because implementing the RewindableSource
interface is even harder
than implementing the Resumable
interface, utilities are usually used.
Utilities¶
There’re some utilities to support implementing sources and its creators. This subsection describes each utility.
core.ImplementSourceStop
¶
core.ImplementSourceStop
is a function that implements the Stop
method
of a source in a thread-safe manner:
func ImplementSourceStop(s Source) Source
A source returned from this function is resumable, but not rewindable even if
the original source implements the core.RewindableSource
interface. In
addition, although a source passed to core.ImplementSourceStop
can
explicitly implement the core.Resumable
interface, its Pause
and
Resume
method will never be called because the source returned from
core.ImplementSourceStop
also implements those methods and controls
pause and resume.
To apply this function, a source must satisfy following restrictions:
The
GenerateStream
method must be implemented in a way that it can safely be called again after it returns.The
GenerateStream
method must return when thecore.Writer.Write
returnedcore.ErrSourceStopped
. The method must return exactly the same error variable that the writer returned.The
Stop
method just returns nil.- This means all resource allocation and deallocation must be done within
the
GenerateStream
method.
- This means all resource allocation and deallocation must be done within
the
A typical implementation of a source passed to core.ImplementSourceStop
is
shown below:
func (s *MySource) GenerateStream(ctx *core.Context, w core.Writer) error {
<initialization>
defer func() {
<clean up>
}()
for {
t := <create a new tuple>
if err := w.Write(ctx, t); err != nil {
return err
}
}
return nil
}
func (s *MySource) Stop(ctx *core.Context) error {
return nil
}
If a source wants to ignore errors returned from core.Writer.Write
other
than core.ErrSourceStopped
, the GenerateStream
method can be modified
as:
if err := w.Write(ctx, t); err != nil {
if err == core.ErrSourceStopped {
return err
}
}
By applying core.ImplementSourceStop
, the Ticker
above can be
implemented as follows:
type Ticker struct {
interval time.Duration
}
func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
var cnt int64
for ; ; cnt++ {
tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
if err := w.Write(ctx, tuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Stop(ctx *core.Context) error {
return nil
}
type TickerCreator struct {
}
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return core.ImplementSourceStop(&Ticker{
interval: interval,
}), nil
}
There’s no stopped
flag now. In this version, the Stop
method of the
source returned by core.ImplementSourceStop
waits until the
GenerateStream
method returns.
core.NewRewindableSource
¶
core.NewRewindableSource
is a function that converts a regular source into
a rewindable source:
func NewRewindableSource(s Source) RewindableSource
A source returned from this function is resumable and rewindable. A source
passed to the function needs to satisfy the same restrictions as
core.ImplementSourceStop
. In addition to that, there’s one more restriction
for core.NewRewindableSource
:
- The
GenerateStream
method must return when thecore.Writer.Write
returnedcore.ErrSourceRewound
. The method must return exactly the same error variable that the writer returned.
Although the GenerateStream
method of a rewindable source must not return
after it emits all tuples, a source passed to the core.NewRewindableSource
function needs to return in that situation. For example, let’s assume there’s a
source that generate tuples from each line in a file. To implement the source
without a help of the utility function, its GenerateStream
must wait for
the Rewind
method to be called after it processes all lines in the file.
However, with the utility, its GenerateStream
can just return once it emits
all tuples. Therefore, a typical implementation of a source passed to the
utility can be same as a source for core.ImplementSourceStop
.
As it will be shown later, a source that infinitely emits tuples can also be rewindable in some sense.
The following is an example of TickerCreator
modified from the example for
core.ImplementSourceStop
:
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
rewindable := false
if v, ok := params["rewindable"]; ok {
b, err := data.AsBool(v)
if err != nil {
return nil, err
}
rewindable = b
}
src := &Ticker{
interval: interval,
}
if rewindable {
return core.NewRewindableSource(src), nil
}
return core.ImplementSourceStop(src), nil
}
In this example, Ticker
has the rewindable
parameter. If it is true,
the source becomes rewindable:
CREATE SOURCE my_rewindable_ticker TYPE ticker WITH rewindable = true;
By issuing the REWIND SOURCE
statement, my_rewindable_ticker
resets
the value of tick
field:
REWIND SOURCE my_rewindable_ticker;
-- output examples of SELECT RSTREAM * FROM my_rewindable_ticker [RANGE 1 TUPLES];
{"tick":0}
{"tick":1}
{"tick":2}
...
{"tick":123}
-- REWIND SOURCE is executed here
{"tick":0}
{"tick":1}
...
bql.SourceCreatorFunc
¶
bql.SourceCreatorFunc
is a function that converts a function having the
same signature as SourceCreator.CreateSource
to a SourceCreator
:
func SourceCreatorFunc(f func(*core.Context,
*IOParams, data.Map) (core.Source, error)) SourceCreator
For example, TickerCreator
above and its registration can be modified to as
follows with this utility:
func CreateTicker(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
return core.ImplementSourceStop(&Ticker{
interval: interval,
}), nil
}
func init() {
bql.MustRegisterGlobalSourceCreator("ticker",
bql.SourceCreatorFunc(CreateTicker))
}
A Complete Example¶
A complete example of Ticker
is shown in this subsection. Assume that the
import path of the example is github.com/sensorbee/examples/ticker
, which
doesn’t actually exist. There’re two files in the repository:
- ticker.go
- plugin/plugin.go
The example uses core.NewRewindableSource
utility function.
ticker.go¶
package ticker
import (
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Ticker struct {
interval time.Duration
}
func (t *Ticker) GenerateStream(ctx *core.Context, w core.Writer) error {
var cnt int64
for ; ; cnt++ {
tuple := core.NewTuple(data.Map{"tick": data.Int(cnt)})
if err := w.Write(ctx, tuple); err != nil {
return err
}
time.Sleep(t.interval)
}
return nil
}
func (t *Ticker) Stop(ctx *core.Context) error {
// This method will be implemented by utility functions.
return nil
}
type TickerCreator struct {
}
func (t *TickerCreator) CreateSource(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return nil, err
}
interval = i
}
rewindable := false
if v, ok := params["rewindable"]; ok {
b, err := data.AsBool(v)
if err != nil {
return nil, err
}
rewindable = b
}
src := &Ticker{
interval: interval,
}
if rewindable {
return core.NewRewindableSource(src), nil
}
return core.ImplementSourceStop(src), nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql"
"github.com/sensorbee/examples/ticker"
)
func init() {
bql.MustRegisterGlobalSourceCreator("ticker", &ticker.TickerCreator{})
}
Sink Plugins¶
This section describes how to implement a sink as a plugin of SensorBee.
Implementing a Sink¶
A struct implementing the following interface can be a sink:
type Sink interface {
Write(ctx *Context, t *Tuple) error
Close(ctx *Context) error
}
This interface is defined in gopkg.in/sensorbee/sensorbee.v0/core
package.
The Write
method write a tuple to a destination of the sink. The argument
ctx
contains the information of the current processing context. t
is the
tuple to be written. The Close
method is called when the sink becomes
unnecessary. It must release all resources allocated for the sink.
The following example sink write a tuple as a JSON to stdout:
type StdoutSink struct {
}
func (s *StdoutSink) Write(ctx *core.Context, t *core.Tuple) error {
_, err := fmt.Fprintln(os.Stdout, t.Data)
return err
}
func (s *StdoutSink) Close(ctx *core.Context) error {
// nothing to release
return nil
}
A sink is initialized by its SinkCreator
, which is described later.
Note
SensorBee doesn’t provide buffering or retry capability for sinks.
Registering a Sink¶
To register a sink to the SensorBee server, the sink needs to provide its
SinkCreator
. The SinkCreator
interface is defined in
gopkg.in/sensorbee/sensorbee.v0/bql
pacakge as follows:
// SinkCreator is an interface which creates instances of a Sink.
type SinkCreator interface {
// CreateSink creates a new Sink instance using given parameters.
CreateSink(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Sink, error)
}
It only has one method: CreateSink
. The CreateSink
method is called
when the CREATE SINK statement is executed.
The ctx
argument contains the information of the current processing context.
ioParams
has the name and the type name of the sink, which are given in
the CREATE SINK
statement. params
has parameters specified in the
WITH
clause of the CREATE SINK
statement.
The creator can be registered by RegisterGlobalSinkCreator
or
MustRegisterGlobalSinkCreator
function. As an example, the creator of
StdoutSink
above can be implemented and registered as follows:
type StdoutSinkCreator struct {
}
func (s *StdoutSinkCreator) CreateSink(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
return &StdoutSink{}, nil
}
func init() {
bql.MustRegisterGlobalSinkCreator("my_stdout", &StdoutSinkCreator{})
}
This sink doesn’t have parameters specified in the WITH
clause of the
CREATE SINK
statement. How to handle parameters for sink is same as how
source does. See Source Plugins for more details.
Utilities¶
There’s one utility function for sink plugins: SinkCreatorFunc
:
func SinkCreatorFunc(f func(*core.Context,
*IOParams, data.Map) (core.Sink, error)) SinkCreator
This utility function is defined in gopkg.in/sensorbee/sensorbee.v0/bql
pacakge. It converts a function having the same signature as
SinkCreator.CreateSink
to a SinkCreator
. With this utility, for example,
StdoutSinkCreator
can be modified to:
func CreateStdoutSink(ctx *core.Context,
ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
return &StdoutSink{}, nil
}
fucn init() {
bql.MustRegisterGlobalSinkCreator("stdout",
bql.SinkCreatorFunc(CreateStdoutSink))
}
A Complete Example¶
A complete example of the sink is shown in this subsection. The package name for
the sink is stdout
and StdoutSink
is renamed to Sink
. Also, this
example uses SinkCreatorFunc
utility for SinkCreator
.
Assume that the import path of the example is
github.com/sensorbee/examples/stdout
, which doesn’t actually exist. The
repository has to files:
- stdout.go
- plugin/plugin.go
stdout.go¶
package stdout
import (
"fmt"
"os"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type Sink struct {
}
func (s *Sink) Write(ctx *core.Context, t *core.Tuple) error {
_, err := fmt.Fprintln(os.Stdout, t.Data)
return err
}
func (s *Sink) Close(ctx *core.Context) error {
return nil
}
func Create(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
return &Sink{}, nil
}
plugin/plugin.go¶
package plugin
import (
"gopkg.in/sensorbee/sensorbee.v0/bql"
"github.com/sensorbee/examples/stdout"
)
func init() {
bql.MustRegisterGlobalSinkCreator("my_stdout",
bql.SinkCreatorFunc(stdout.Create))
}
Reference¶
BQL Statements¶
CREATE SINK¶
Synopsis¶
CREATE SINK name TYPE type_name [WITH parameter_name = parameter_value [, ...]]
Description¶
CREATE SINK
creates a new sink in a topology.
Parameters¶
- name
- The name of the sink to be created.
- type_name
- The type name of the sink.
- parameter_name
- The name of a sink-specific parameter.
- parameter_value
- The value for a sink-specific parameter.
Sink Parameters¶
The optional WITH
clause specifies parameters specific to the sink.
See each sink’s documentation to find out parameters it provides.
Examples¶
To create a sink having the name “snk” with no sink-specific parameter:
CREATE SINK snk TYPE fluentd;
To create a sink with sink-specific parameters:
CREATE SINK fluentd TYPE fluentd WITH tag_field = "fluentd_tag";
As you can see, the name of a sink can be same as the type name of a sink.
CREATE SOURCE¶
Synopsis¶
CREATE [PAUSED] SOURCE name TYPE type_name [WITH parameter_name = parameter_value [, ...]]
Description¶
CREATE SOURCE
creates a new source in a topology.
Parameters¶
PAUSED
- The source is paused when it’s created if this option is used.
- name
- The name of the source to be created.
- type_name
- The type name of the source.
- parameter_name
- The name of a source-specific parameter.
- parameter_value
- The value for a source-specific parameter.
Source Parameters¶
The optional WITH
clause specifies parameters specific to the source.
See each source’s documentation to find out parameters it provides.
Notes¶
Some sources stop after emitting all tuples. They can stop even before any
subsequent statement is executed. For such sources, specify PAUSED
parameter
and run RESUME SOURCE after completely setting up a topology so
that no tuple emitted from the source will be lost.
Examples¶
To create a source having the name “src” with no source-specific parameter:
CREATE SOURCE src TYPE dropped_tuples;
To create a source with source-specific parameters:
CREATE SOURCE fluentd TYPE fluentd WITH bind = "0.0.0.0:12345",
tag_field = "my_tag";
As you can see, the name of a source can be same as the type name of a source.
CREATE STATE¶
Synopsis¶
CREATE STATE name TYPE type_name [WITH parameter_name = parameter_value [, ...]]
Description¶
CREATE STATE
creates a new UDS (User Defined State) in a topology.
Parameters¶
- name
- The name of the UDS to be created.
- type_name
- The type name of the UDS.
- parameter_name
- The name of a UDS-specific parameter.
- parameter_value
- The value for a UDS-specific parameter.
UDS Parameters¶
The optional WITH
clause specifies parameters specific to the UDS.
See each UDS’s documentation to find out parameters it provides.
Examples¶
To create a UDS named “my_uds” with no UDS-specific parameter:
CREATE STATE my_uds TYPE my_uds_type;
To create a UDS with UDS-specific parameters:
CREATE STATE my_ids TYPE snowflake_id WITH machine_id = 1;
CREATE STREAM¶
Synopsis¶
CREATE STREAM name AS select
Description¶
CREATE STREAM
creates a new stream (a.k.a a continuous view) in a topology.
Parameters¶
- name
- The name of the stream to be created.
- select
- The
SELECT
statement to generate a stream. select can be anySELECT
statement including a statement usingUNION ALL
.
Examples¶
To create a stream named “strm”:
CREATE STREAM strm AS SELECT RSTREAM * FROM src [RANGE 1 TUPLES];
To create a stream which merges all tuples from multiple streams:
CREATE STREAM strm AS
SELECT RSTREAM * FROM src1 [RANGE 1 TUPLES]
UNION ALL
SELECT RSTREAM * FROM src2 [RANGE 1 TUPLES];
DROP SINK¶
Synopsis¶
DROP SINK name
Description¶
DROP SINK
drops a sink that is already created in a topology. The sink can
no longer be used after executing the statement.
Parameters¶
- name
- The name of the sink to be dropped.
DROP SOURCE¶
Synopsis¶
DROP SOURCE name
Description¶
DROP SOURCE
drops a source that is already created in a topology. The
source is stopped and removed from a topology. After executing the statement,
the source cannot be used.
Parameters¶
- name
- The name of the source to be dropped.
DROP STATE¶
Synopsis¶
DROP STATE name
Description¶
DROP STATE
drops a UDS that is already created in a topology. The UDS can
no longer be used after executing the statement.
Note
Even if a uds
sink exist for the UDS, the sink will not be dropped when
dropping the UDS. The uds
sink must be dropped manually.
Parameters¶
- name
- The name of the UDS to be dropped.
DROP STREAM¶
Synopsis¶
DROP STREAM name
Description¶
DROP STREAM
drops a stream that is already created in a topology. The
stream can no longer be used after executing the statement.
Parameters¶
- name
- The name of the stream to be dropped.
INSERT INTO¶
Synopsis¶
INSERT INTO sink FROM stream
Description¶
INSERT INTO
inserts tuples from a stream or a source to a sink.
Parameters¶
- sink
- The name of the sink to which tuples are inserted.
- stream
- The name of a stream or a source.
Examples¶
To insert tuples into a sink from a source having the name “src”:
INSERT INTO snk FROM src;
LOAD STATE¶
Synopsis¶
LOAD STATE name TYPE type_name [TAG tag]
[SET set_parameter_name = set_parameter_key]
[create_if_not_saved]
where create_if_not_saved is:
OR CREATE IF NOT SAVED
[WITH create_parameter_name = create_parameter_value]
Description¶
LOAD STATE
loads a UDS that is previously saved by
SAVE STATE.
LOAD STATE
fails if the UDS hasn’t been saved yet. When
OR CREATE IF NOT SAVED
is specified, LOAD STATE
creates a new UDS with
the given optional parameters if the UDS hasn’t been saved yet.
LOAD STATE
, even with OR CREATE IF NOT SAVED
, fails if the UDS doesn’t
support the statement.
Parameters¶
- name
- The name of the UDS to be loaded.
- type_name
- The type name of the UDS.
- tag
- The name of the user defined tag for versioning of the saved UDS data. When tag is omitted, “default” is used as the default tag name.
- set_parameter_name
- The name of a UDS-specific parameter defied for
LOAD STATE
. - set_parameter_value
- The value for a UDS-specific parameter defined for
LOAD STATE
. - create_parameter_name
- The name of a UDS-specific parameter defined for
CREATE STATE
. - create_parameter_value
- The value for a UDS-specific parameter defined for
CREATE STATE
.
LOAD STATE
can have two sets of parameters: set_parameters and
create_parameters. set_parameters are used when there’s a saved UDS
data having the given tag. On the other hand, create_parameters are used
when the UDS hasn’t been saved yet. create_parameters are exactly same as
parameters that the UDS defines for CREATE STATE. However,
set_parameters are often completely different from create_parameters.
Because create_parameters are often saved as a part of the UDS’s
information by SAVE STATE
, set_parameters doesn’t have to have the same
set of parameters defined in create_parameters.
There’re some use-cases that a UDS uses set_parameters:
Customize loading behavior
- When a UDS doesn’t provide a proper versioning of saved data,
LOAD STATE
may fail to load it due to the format incompatibility. In such a case, it’s difficult to modify saved binary data to have a format version number. Thus, providing a set_parameter specifying the format version number could be the only solution.
- When a UDS doesn’t provide a proper versioning of saved data,
Overwrite some saved values of create_parameters
Like create_parameters, set_parameters are specific to each UDS. See the documentation of each UDS to find out parameters it provides.
Examples¶
To load a UDS named “my_uds” and having the type “my_uds_type”:
LOAD STATE my_uds TYPE my_uds_type;
Note that “my_uds” needs to be saved before executing this statement. Otherwise, it fails.
To load a UDS named “my_uds” and having the type “my_uds_type” and assigned a tag “trained”:
LOAD STATE my_uds TYPE my_uds_type TAG trained;
To load a UDS with set_parameters:
LOAD STATE my_uds TYPE my_uds_type TAG trained
SET force_format_version = "v1";
To load a UDS that hasn’t been saved yet with OR CREATE IF NOT SAVED
:
LOAD STATE my_uds TYPE my_uds_type OR CREATE IF NOT SAVED;
To load a UDS that hasn’t been saved yet with OR CREATE IF NOT SAVED
with
create_parameters:
LOAD STATE my_uds TYPE my_uds_type OR CREATE IF NOT SAVED
WITH id = 1;
When the UDS hasn’t been saved previously, the statement above falls back into
CREATE STATE
as follows:
CREATE STATE my_uds TYPE my_uds_type WITH id = 1;
OR CREATE IF NOT SAVED
can be used with a tag and set_parameters:
LOAD STATE my_uds TYPE my_uds_type TAG trained SET force_format_version = "v1"
OR CREATE IF NOT SAVED WITH id = 1;
PAUSE SOURCE¶
Synopsis¶
PAUSE SOURCE name
Description¶
PAUSE SOURCE
pauses a running source so that the source stops emitting
tuples until executing RESUME SOURCE on it again. Executing
PAUSE SOURCE
on a paused source doesn’t affect anything.
PAUSE SOURCE
fails if the source doesn’t support the statement.
Parameters¶
- name
- The name of the source to be paused.
RESUME SOURCE¶
Synopsis¶
RESUME SOURCE name
Description¶
RESUME SOURCE
resumes a paused source so that the source can start to emit
tuples again. Executing RESUME SOURCE
on a running source doesn’t affect anything.
RESUME SOURCE
fails if the source doesn’t support the statement.
Parameters¶
- name
- The name of the source to be resumed.
Examples¶
A common use case of RESUME SOURCE
to resume a source which is created by
CREATE PAUSED SOURCE
.
CREATE PAUSED SOURCE src TYPE some_source_type WITH ...parameters...;
-- ... construct a topology connected to src ...
RESUME SOURCE src;
By doing this, no tuple emitted from src will be lost.
REWIND SOURCE¶
Synopsis¶
REWIND SOURCE name
Description¶
REWIND SOURCE
rewinds a source so that the source emits tuples from the
beginning again.
REWIND SOURCE
fails if the source doesn’t support the statement.
Parameters¶
- name
- The name of the source to be rewound.
SAVE STATE¶
Synopsis¶
SAVE STATE name [TAG tag]
Description¶
SAVE STATE
saves a UDS to SensorBee’s storage. The location or the format
of the saved UDS depends on a storage that SensorBee uses and is not
controllable from BQL.
SAVE STATE
fails if the UDS doesn’t support the statement.
Parameters¶
- name
- The name of the UDS to be saved.
- tag
- The name of the user defined tag for versioning of the saved UDS data. When tag is omitted, “default” is used as the default tag name.
Examples¶
To save a UDS named “my_uds” without a tag:
SAVE STATE my_uds;
To save a UDS with a tag “trained”:
SAVE STATE my_uds TAG trained;
SELECT¶
Synopsis¶
SELECT emitter {* | expression [AS output_name]} [, ...]
FROM from_item stream_to_relation_operator
[AS stream_alias] [, ...]
[WHERE condition [, ...]]
[GROUP BY expression [, ...]]
[HAVING condition [, ...]]
[UNION ALL select]
where emitter is:
{RSTREAM | ISTREAM | DSTREAM}
where stream_to_relation_operator is:
'[' RANGE range_number {TUPLES | SECONDS | MILLISECONDS}
[, BUFFER SIZE buffer_size]
[, drop_mode IF FULL]
']'
Description¶
SELECT
retrieves tuples from one or more streams. The general processing of
SELECT
is as follows:
- Each from_item is converted into a relation (i.e. a window) from a
stream. Then, each tuple emitted from a from_item is computed within
the window. If more than one element is specified in the
FROM
clause, they are cross-joined. - When the
WHERE
clause is specified, condition is evaluated for each set of tuples cross-joined in theFROM
clause. Tuples which do not satisfy the condition are eliminated from the output. - If the
GROUP BY
clause is specified, tuples satisfied the condition in theWHERE
clause are combined into groups based on the result of expressions. Then, aggregate functions are performed on each group. When theHAVING
clause is given, it eliminates groups that do not satisfy the given condition. - The output tuples are computed using the
SELECT
output expressions for each tuple or group. - Computed output tuples are converted into a stream using
Relation-to-Stream Operators and emitted from the
SELECT
. UNION ALL
, if present, combines outputs from multipleSELECT
. It simply emits all tuples from allSELECT
without considering duplicates.
Parameters¶
Emitter¶
emitter controls how a SELECT
emits resulting tuples.
RSTREAM
- When
RSTREAM
is specified, all tuples in a relation as a result of processing a newly coming tuple are output. See Relation-to-Stream Operators for more details. ISTREAM
- When
ISTREAM
is specified, tuples contained in the current relation but not in the previously computed relation are emitted. In other words, tuples that are newly inserted or updated since the previous computation are output. See Relation-to-Stream Operators for more details. DSTREAM
- When
DSTREAM
is specified, tuples contained in the previously computed relation but not in the current relation are emitted. In other words, tuples in the previous relation that are deleted or updated in the current relation are output. Note that output tuples are from the previous relation so that they have old values. See Relation-to-Stream Operators for more details.
FROM
Clause¶
The FROM
clause specifies one or more source streams for the SELECT
and converts those streams into relations using stream to relation operators.
The FROM
clause contains following parameters:
- from_item
- from_item is a source stream which can be either a source, a stream, or a UDSF.
- range_number
range_number is a numeric value which specifies how many tuples are in the window. range_number is followed by one of interval types:
TUPLES
,SECONDS
, orMILLISECONDS
.When
TUPLES
is given, range_number must be a positive integer and the window can contain at most range_number tuples. If a new tuple is inserted into the window having range_number tuples, the oldest tuple is removed. “The oldest tuple” is the tuple that was inserted into the window before any other tuples, not the tuple having the oldest timestamp. The maximum range_number is 1048575 withTUPLES
keywords.When
SECONDS
orMILLISECONDS
is specified, range_number can be a positive number and the difference of the minimum and maximum timestamps of tuples in the window can be at most range_number seconds or milliseconds. If a new tuple is inserted into the window, tuples whose timestamp is range_number seconds or milliseconds earlier than the new tuple’s timestamp are removed. The maximum range_number is 86400 withSECONDS
and 86400000 withMILLISECONDS
.- buffer_size
- buffer_size specifies the size of buffer, or a queue, located between
from_item and the
SELECT
. buffer_size must be an integer and greater than 0. The meximum buffer_size is 131071. - drop_mode
drop_mode controls how a new tuple is inserted into the buffer located between from_item and the
SELECT
when the buffer is full. drop_mode can be one of the followings:WAIT
- A new tuple emitted from from_item is blocked until the
SELECT
consumes at least one tuple.
- A new tuple emitted from from_item is blocked until the
DROP OLDEST
- The oldest tuple in the buffer is removed and a new tuple is inserted into the buffer. “The oldest tuple” is the tuple that was inserted into the buffer before any other tuples, not the tuple having the oldest timestamp.
DROP NEWEST
- The oldest tuple in the buffer is removed and a new tuple is inserted into the buffer. “The newest tuple” is the tuple that was inserted into the buffer after any other tuples, not the tuple having the newest timestamp.
Note
A buffer is different from a window. A buffer is placed in front of a window. A window gets a new tuple from a buffer and computes a new relation. A buffer is used not to block emitting tuples so that multiple
SELECT
statements can work concurrently without waiting for their receivers to consume tuples.- stream_alias
- stream_alias provides an alias of from_item and it can be referred
by the alias in other parts of the
SELECT
. If the alias is given, the original name is hidden and cannot be used to refer from_item.
Fields of tuples can be referred by <field_name>
or
<stream>:<field_name>
in other clauses and the SELECT
list. For
example, when the SELECT
has FROM strm [RANGE 1 TUPLES]
and strm
emits {"a":<some value>}
, the field a
can be referred by a
or
strm:a
. These two forms cannot be mixed in a SELECT
statement.
The form <stream>:<field_name>
is required when the FROM
clause has
multiple input streams.
WHERE
Clause¶
The SELECT
can optionally have a WHERE
clause. The WHERE
clause
have a condition. The condition can be any expression that evaluates to a
result of type bool
. Any tuple that does not satisfy the condition
(i.e. the result of the expression is false
) will be eliminated from the
output.
Operators describes operators that can be used in the condition.
GROUP BY
Clause¶
The GROUP BY
clause is an optional clause and condenses into a single
tuple all selected tuples whose expressions specified in GROUP BY
clause
result in the same value.
expression can be any expression using fields of an input tuple. When there’re multiple expressions in the clause, tuples having the same set of values computed from those expressions are grouped into a single tuple.
When the GROUP BY
clause is present, any ungrouped field cannot be used
as an output field without aggregate functions. For example, when tuples have
4 fields a
, b
, c
, and d
, and the GROUP BY
clause has
following expressions:
GROUP BY a, b + c
a
can only be used as an output field:
SELECT a FROM stream [RANGE 1 TUPLES]
GROUP BY a, b + c;
Other fields need to be specified in aggregate functions:
SELECT a, max(b), min(b + c), avg(c * d) FROM stream [RANGE 1 TUPLES]
GROUP BY a, b + c;
Aggregate functions are evaluated for each group using all tuples in the group.
Note
The GROUP BY
clause performs grouping within a window:
SELECT a FROM stream [RANGE 10 TUPLES]
GROUP BY a;
This SELECT
computes at most 10 groups of tuples because there’re only
10 tuples in the window.
HAVING
Clause¶
The HAVING
clause is an optional clause and placed after the GROUP BY
clause. The HAVING
clause has a condition and evaluate it for each group,
instead of each tuple. When ungrouped fields are used in the condition, they
need to be in aggregate functions:
SELECT a, max(b), min(b + c), avg(c * d) FROM stream [RANGE 1 TUPLES]
GROUP BY a, b + c HAVING min(b + c) > 1 AND avg(c * d) < 10;
In this example, b
, c
, and d
are ungrouped fields and cannot
directly specified in the condition.
SELECT
List¶
The SELECT
list, placed between the emitter and the FROM
clause,
defines the form of the output tuples emitted from the SELECT
statement.
Each item in the list can be any expression. Each item (i.e. output field)
will have a name. When an expression only consists of a field name, the output
name of the expression will be the field name. For example, the output name
of strm:price
in SELECT RSTREAM strm:price FROM ...
will be price
,
not strm:price
. When the expression is a UDF call, the name of the UDF
will be used as the name of the output field. For example, the result of
count(*)
is named as count
. If an expression is other than a field
name or a UDF call, the output name will be col_n
where n
is replaced
with the number corresponding to n-th expression (counting from 0). The output
field name can manually be specified by AS output_name
.
When the expression is *
, all fields which have not been specified in the
SELECT
list yet will be included. Output names of those fields will be
identical to the original field names.
If an expression results in a map, its output name can be AS *
. In such
case, all fields of the map is extended to the top level fields. For example,
in SELECT RSTREAM a, b AS *, c FROM strm ...
, when strm emits tuples
having
{
"a": v1,
"b": {
"d": v3,
"e": v4
},
"c": v2,
}
to the SELECT
, its output will be
{
"a": v1,
"c": v2,
"d": v3,
"e": v4
}
When some fields have the same name, only one of them will be included in the result. It is undefined which field will be chosen as a result.
Notes¶
An Emitter and Its Performance¶
There’re some use case specific optimizations of the evaluation of the
SELECT
and this subsection describes each optimization and its limitation.
Performing a simple per-tuple transformation or filtering over an input stream is a very common task. Therefore, BQL optimizes statements having the following form:
SELECT RSTREAM projection FROM input [RANGE 1 TUPLES] WHERE condition;
Limitations of this optimization are:
- There can only be one input stream and its range is
[RANGE 1 TUPLES]
. - The emitter must be
RSTREAM
.
Evaluation in WHERE
Clause¶
Each set of tuples cross-joined in the FROM
clause is evaluated exactly once
in the WHERE
clause. Therefore, all functions in the WHERE
clause are
only called once for each set:
SELECT RSTREAM * FROM stream1 [RANGE 100 TUPLES], stream2 [RANGE 100 TUPLES]
WHERE random() < 0.2;
In this example, 80% of sets of cross-joined tuples are filtered out and only 20% of sets (around 20 tuples for each input from either stream) are emitted.
Commands¶
build_sensorbee¶
Because SensorBee is written in Go, all its dependencies including plugins
generally need to be statically linked, or at least cannot be dynamically
loaded at runtime. The build_sensorbee
command is provided to support
building a custom sensorbee
command.
Basic Usage¶
Prepare build.yaml
configuration file and run build_sensorbee
in the
same directory as build.yaml
is located at:
$ ls
build.yaml
$ build_sensorbee
sensorbee_main.go
$ ls
build.yaml
sensorbee
sensorbee_main.go
sensorbee
is the result executable file and sensorbee_main.go
is a
Go file generated by build_sensorbee
and passed for go build
command
to build sensorbee
.
Configuration¶
build_sensorbee
requires a configuration file named build.yaml
. The file
is written in YAML and has following optional sections:
plugins
commands
plugins
¶
The plugins
section is optional and may have a list of plugins as follows:
plugins:
- github.com/sensorbee/twitter/plugin
- github.com/sensorbee/fluentd/plugin
- github.com/sensorbee/nlp/plugin
- github.com/sensorbee/tutorial/ml/plugin
- github.com/sensorbee/jubatus/classifier/plugin
A plugin must be provided as a valid import path of Go. A path depends on each plugin.
commands
¶
The commands
section is optional and is used to customize subcommands that
the sensorbee
command will have. By default, or when the section is empty,
subcommands include all standard commands:
run
runfile
shell
topology
commands
section is a map whose key is the name of subcommand. Standard
subcommands like run
can be added by providing empty entries:
commands:
run:
shell:
With this configuration, the sensorbee
command will only have run
and
shell
commands.
To add a custom command, an entry must have path
parameter that is a Go
import path of the command:
commands:
run:
shell:
mytest:
path: "path/to/sb-mytest"
With this configuration, the sensorbee
command will also have the mytest
subcommand. The subcommand is implemented at path/to/sb-mytest
.
Names of commands must be unique and cannot be any of:
cli
os
version
time
Prohibited names might be added in the future version.
Custom Subcommand Development¶
A custom subcommand for the sensorbee
command can be developed as a Go
package. Only thing the package has to do is to provide a function
func SetUp() cli.Command
. cli
is gopkg.in/urfave/cli.v1
.
A minimum example is provided in
the SensorBee tutorial repository:
package hello
import (
"fmt"
cli "gopkg.in/urfave/cli.v1"
)
func SetUp() cli.Command {
return cli.Command{
Name: "hello",
Usage: "say hello",
Action: func(c *cli.Context) error {
fmt.Println("hello")
return nil
},
}
}
This command prints “hello” when sensorbee hello is executed. See
https://github.com/urfave/cli to learn how to create a command using the
cli
library.
A Complete Example¶
plugins:
- github.com/sensorbee/twitter/plugin
- github.com/sensorbee/fluentd/plugin
- github.com/sensorbee/nlp/plugin
- github.com/sensorbee/tutorial/ml/plugin
- github.com/sensorbee/jubatus/classifier/plugin
commands:
run:
runfile:
shell:
topology:
hello:
path: "github.com/sensorbee/tutorial/custom_command/hello"
Flags and Options¶
--config path
or -c path
This option specifies the path to the configuration file to be used. Its default value is
build.yaml
. With this option, a configuration file in another directory can be used as follows:$ build_sensorbee -c /path/to/dir/special_build.yaml
--download-plugins={true|false}
This option have to be
true
orfalse
. When the value istrue
,build_sensorbee
downloads (i.e.go get
) all plugins listed inbuild.yaml
. When it’sfalse
,build_sensorbee
doesn’t download plugins and tries to used plugins as installed in the environment. The default value istrue
.Specifying
false
is useful when the customsensorbee
command needs to depend on a plugin that is in a special git branch or locally modified.
--help
or -h
When this flag is given, the command shows the usage of itself and exits without doing anything.
--only-generate-source
When this flag is given,
build_sensorbee
doesn’t build asensorbee
command but only generate a source code that can be built bygo build
command. For example:$ build_sensorbee --only-generate-source sensorbee_main.go $ go build -o sensorbee sensorbee_main.go
--out executable_name
or -o executable_name
This option customizes the name of the output executable file. The default is
sensorbee.exe
in Windows andsensorbee
in all other environment. The following command generates an executable namedmy_sensorbee
instead ofsensorbee
:$ build_sensorbee -o my_sensorbee
--source-filename filename
The filename of the Go source code file automatically generated by
build_sensorbee
can be specified by this option. The default value issensorbee_main.go
.$ build_sensorbee --source-filename custom_main.goBy executing this command,
custom_main.go
is generated instead ofsensorbee_main.go
.
--version
or -v
When this flag is given, the command prints the version of thebuild_sensorbee
command.
sensorbee¶
sensorbee
is the main command to manipulate SensorBee. sensorbee
consists of a set of following subcommands:
sensorbee
command can needs to be created by build_sensorbee command and
all the example commands are written as ./sensorbee
to emphasize that
there’s no default sensorbee
command.
See each command’s reference for details.
Flags and Options¶
--help
or -h
When this flag is given, the command prints the usage of itself.
--version
or -v
The command prints the version of SensorBee.
sensorbee run¶
sensorbee run
runs the SensorBee server that manages multiple topologies
that can dynamically modified at runtime.
Basic Usage¶
$ ./sensorbee run -c sensorbee.yaml
Configuration¶
A configuration file can optionally be provided for sensorbee run
command.
The file is written in YAML and has following optional
sections:
logging
network
storage
topologies
logging
¶
The logging
section customizes behavior of the logger. It has following
optional parameters:
target
The
target
parameter changes the destination of log messages. Its value can be of followings:
stdout
: write log messages to stdoutstderr
: write log messages to stderr- file path: a path to the log file
When the value is neither
stdout
norstderr
, it’s considered to be a file path. The default value of this parameter isstderr
.
min_log_level
This option specifies the minimum level (severity) of log messages to be written. Valid values are one ofdebug
,info
,warn
,warning
,error
, orfatal
.warn
can also bewarning
. Whendebug
is given, all levels of messages will be written into the log. When the value iserror
, only log messages witherror
orfatal
level will be written. The default value of this parameter isinfo
.
log_dropped_tuples
The SensorBee server can prints a log message and contents of tuples when they’re dropped from a topology. When this option istrue
, the server writes log messages reporting dropped tuples. When it’sfalse
, the server doesn’t. The default value of this option isfalse
.
log_destinationless_tuples
A destinationless tuple is one kind of dropped tuples that is caused when a source or a stream doesn’t have any destination and it drops a tuple. By settingtrue
to this option, the server reports all destinationless tuples. The default value of this option isfalse
. Note that, to log replication tuples, thelog_dropped_tuples
option also needs to betrue
.
summarize_dropped_tuples
This option turns on or off summarization of dropped tuple logging. Valid values for this option is
true
orfalse
. When its value is thetrue
, dropped tuples are summarized in log messages.Note
At the current version, only
blob
fields are summarized to"(blob)"
. Other configuration parameters will be supported in the future version such as the maximum number of fields, the maximum depths of maps, the maximum length of arrays, and so on.When
false
is specified, each log message shows a complete JSON that are compatible to the original tuple. Although this is useful for debugging, tuples containing large binary data like images may result in disk.The default value of this option is
false
.
Example:
logging:
target: /path/to/sensorbee.log
min_log_level: info
log_dropped_tuples: true
summarize_dropped_tuples: true
network
¶
The network
section has parameters related to server’s network
configuration. It has following optional parameters:
listen_on
This parameter controls how the server expose its listening port. The syntax of the value is likehost:port
.host
can be IP addresses such as0.0.0.0
or127.0.0.1
. Whenhost
is given, the server only listens on the interface with the given host address. If thehost
is omitted, the server listens on all available interfaces, that is, the server accepts connections from any host. The default value of this parameter is:15601
.
Example:
network:
listen_on: ":15601"
storage
¶
The storage
section contains the configuration of storages used for saving
UDSs or other information. It has following optional subsections:
uds
uds
¶The uds
subsection configures the storage for saving and loading UDSs. It
provides following optional parameters:
type
The type name of the storage.in_memory
is used as the default value.
params
params
has subparameter specific to the giventype
.
Currently, following types are available:
in_memory
fs
Descriptions of types and parameters are provided below:
in_memory
in_memory
saves UDSs in memory. It loses all saved data when the server restarts. This type doesn’t have any parameter.Example:
storage: uds: type: in_memory
fs
fs
saves UDSs in the local file system. It has following required parameters:
dir
dir
has the path to the directory that saved data will be stored.
fs
also has following optional parameters:
temp_dir
temp_dir
has the path to the temporary directory that is used when the UDS writes data. After the UDS has written all the data, the file is move to the directory specified bydir
parameter. The same value asdir
is used by default.The file name of each saved UDS is formatted as
<topology>-<name>-<tag>.state
.Example:
storage: uds: type: fs params: dir: /path/to/uds_dir temp_dir: /tmp
topologies
¶
The topologies
section contains the configuration of topologies in the
following format:
topologies:
name_of_topology1:
... configuration for name_of_topology1 ...
name_of_topology2:
... configuration for name_of_topology2 ...
name_of_topology3:
... configuration for name_of_topology3 ...
... other topologies ...
Topologies listed in this section will be created at the startup of the server based on the sub-configuration of each topology. Following optional configuration parameters are provided for each topology:
bql_file
This parameter has the path to the file containing BQL statements for the topology. All statements are executed before the server gets ready. If the execution fails, the server would exit with an error.
Example:
$ ls
my_topology.bql
sensorbee.yaml
$ cat my_topology.bql
CREATE SOURCE fluentd TYPE fluentd;
CREATE STREAM users AS
SELECT RSTREAM users FROM fluentd [RANGE 1 TUPLES];
CREATE SINK user_file TYPE file WITH path = "users.jsonl";
$ cat sensorbee.yaml
topologies:
my_topology:
bql_file: my_topology.bql
$ ./sensorbee run -c sensorbee.yaml
As a result of these commands above, the server started with sensorbee.yaml
has a topology named my_topology
. The topology has three nodes: fluentd
,
users
, and user_file
.
Note
This is the only way to persist the configuration of topologies at the
moment. Any updates applied at runtime will not be reflected into the bql file.
For example, if the server restarts after creating a new stream in
my_topology
, the new stream will be lost unless it’s explicitly added
to my_topology.bql
manually.
The configuration of a topology can be empty:
topologies:
my_empty_topology:
In this case, an empty topology my_empty_topology
will be created so that
the sensorbee topology create
command doesn’t have to be executed every
time the server restarts.
A Complete Example¶
logging:
target: /path/to/sensorbee.log
min_log_level: info
log_dropped_tuples: true
summarize_dropped_tuples: true
network:
listen_on: ":15601"
storage:
uds:
type: fs
params:
dir: /path/to/uds_dir
temp_dir: /tmp
topologies:
empty_topology:
my_topology:
bql_file: /path/to/my_topology.bql
Flags and Options¶
--config path
or -c path
This option receives the path of the configuration file. By default, the value is empty and no configuration file is used. This value can also be passed throughSENSORBEE_CONFIG
environment variable.
--help
or -h
When this flag is given, the command prints the usage of itself.
sensorbee runfile¶
sensorbee runfile
runs a single BQL file. This command is mainly designed
for offline data processing but can be used as a standalone SensorBee process
that doesn’t expose any interface to manipulate the topology.
sensorbee runfile
stops after all the nodes created by the given BQL file
stops. The command doesn’t stop if it contains a source that generates infinite
tuples or is rewindable. Other non-rewindable sources such as file
stopping
when it emits all tuples written in a file can work well with the command.
Sources generally need to be created with PAUSED
keyword in the
CREATE SOURCE statement. Without PAUSED
, a source can start
emitting tuples before all nodes in a topology can correctly be set up.
Therefore, a BQL file passed to the command should look like:
CREATE PAUSED SOURCE source_1 TYPE ...;
CREATE PAUSED SOURCE source_2 TYPE ...;
...
CREATE PAUSED SOURCE source_n TYPE ...;
... CREATE STREAM, CREATE SINK, or other statements
RESUME SOURCE source_1;
RESUME SOURCE source_2;
...
RESUME SOURCE source_n;
With the --save-uds
option described later, it saves UDSs at the end of its
execution.
Basic Usage¶
$ ./sensorbee runfile my_topology.bql
With options:
$ ./sensorbee runfile -c sensorbee.yaml -s '' my_topology.bql
Configuration¶
sensorbee runfile
accepts the configuration file for sensorbee run
. It
only uses logging
and storage
sections. The configuration file may
contain other sections as well and the same file for sensorbee run
can also
be used for sensorbee runfile
. See
its configuration for details.
Flags and Options¶
--config path
or -c path
This option receives the path of the configuration file. By default, the value is empty and no configuration file is used. This value can also be passed throughSENSORBEE_CONFIG
environment variable.
--help
or -h
When this flag is given, the command prints the usage of itself.
--save-uds udss
or -s udss
This option receives a list of names of UDSs separated by commas. UDSs listed in it will be saved at the end of execution. For example, when the option is
-s "a,b,c"
, UDSs nameda
,b
, andc
will be saved. To save all UDSs in a topology, pass an empty string:-s ""
.By default, all UDSs will not be saved at the end of execution.
--topology name
or -t name
This option changes the name of the topology to be run with the given BQL file. The default name is taken from the file name of the BQL file. The name specified to this option will be used in log messages or saved UDS data. Especially, names of files containing saved UDS data has contains the name of the topology. Therefore, providing the same name as the topology that will be run bysensorbee run
later on allows users to prepare UDSs including pre-trained machine learning models in advance.
sensorbee shell or bql¶
sensorbee shell
or bql
starts a new shell to manipulate the SensorBee
server. The shell can be terminated by writing exit
or typing C-d
.
Both sensorbee shell
and bql
have the same interface, but bql
is
installed by default while the sensorbee
command needs to be built manually
to run sensorbee shell
.
Basic Usage¶
To run sensorbee shell
,
$ ./sensorbee shell -t my_topology
my_topology>
To run bql
,
$ bql -t my_topology
my_topology>
Flags and options¶
--api-version version
This option changes the API version of the SensorBee server. The default value of this option isv1
.
--help
or -h
When this flag is given, the command prints the usage of itself.
--topology name
or -t name
The name of a topology to be manipulated can be specified through this option so thatUSE topology_name
doesn’t have to be used in the shell. The default value is an empty name, that is, no topology is specified.
--uri
This option is used when the SensorBee server is running at non-localhost or using non-default port number (15601). The value should have a format likehttp://host:port/
. The default value of this option ishttp://localhost:15601/
.
sensorbee topology¶
sensorbee topology
, or sensorbee t
, is used to manipulate topologies on
the SensorBee server.
Note
This command is provided because the syntax of BQL statements that controls topologies has not been discussed enough yet.
The command consists of following subcommands:
sensorbee topology create <name>
or sensorbee t c <name>
This command creates a new topology on the SensorBee server. The<name>
argument is the name of the topology to be created.$?
will be 0 if the command is successful. Otherwise, it’ll be non-zero. The command fails if the topology already exists on the server.
sensorbee topology drop <name>
or sensorbee t drop <name>
This command drops an existing topology on the SensorBee server. The<name>
argument is the name of the topology to be dropped.$?
will be 0 if the command is successful. Otherwise, it’ll be non-zero. The command doesn’t fail even if the topology doesn’t exist on the server.
sensorbee topology list
or sensorbee t l
This commands prints names of all topologies that the SensorBee server has, one name per line.
All commands share the same flags and options. Flags and options need to be given after the subcommand name:
$ ./sensorbee topology create --flag --option value my_topology
In this example, a flag --flag
and an option --option value
are
provided. The argument of the command, i.e. the name of topology, is
my_topology
.
Flags and Options¶
--api-version version
This option changes the API version of the SensorBee server. The default value of this option isv1
.
--help
or -h
When this flag is given, the command prints the usage of itself.
--uri
This option is used when the SensorBee server is running at non-localhost or using non-default port number (15601). The value should have a format likehttp://host:port/
. The default value of this option ishttp://localhost:15601/
.
Function Reference¶
Common Mathematical Functions¶
For the functions below, if a given parameter is outside the mathematically valid range for that function (e.g., sqrt(-2)
, log(0)
, div(2.0, 0.0)
) and the return type is float
, then NaN
is returned.
However, if the return type is int
(e.g., div(2, 0)
), there is no NaN
option and an error will occur instead.
abs
¶
abs(x)
Description¶
abs
computes the absolute value of a number.
Parameter Types¶
x
int
orfloat
Return Type¶
same as input
Examples¶
Function Call | Result |
---|---|
abs(-17.4) |
17.4 |
cbrt
¶
cbrt(x)
Description¶
cbrt
computes the cube root of a number.
Parameter Types¶
x
int
orfloat
Return Type¶
float
Examples¶
Function Call | Result |
---|---|
cbrt(27.0) |
3.0 |
cbrt(-3) |
-1.4422495703074083 |
ceil
¶
ceil(x)
Description¶
ceil
computes the smallest integer not less than its argument.
Parameter Types¶
x
int
orfloat
Return Type¶
same as input
The return type is float
for float
input in order to avoid problems with input values that are too large for the int
data type.
Examples¶
Function Call | Result |
---|---|
ceil(1.3) |
2.0 |
ceil(-1.7) |
-1.0 |
degrees
¶
degrees(x)
Description¶
degrees
converts radians to degrees.
Parameter Types¶
x
int
orfloat
Return Type¶
float
Examples¶
Function Call | Result |
---|---|
degrees(3.141592653589793) |
180.0 |
div
¶
div(y, x)
Description¶
div
computes the integer quotient y
/x
of two numbers y
and x
.
If x
is 0.0
(float) then NaN
will be returned; if it is 0
(integer) then a runtime error will occur.
Parameter Types¶
y
int
orfloat
x
- same as
y
Return Type¶
same as input
Examples¶
Function Call | Result |
---|---|
div(9, 4) |
2 |
div(9.3, 4.5) |
2.0 |
exp
¶
exp(x)
Description¶
exp
computes the exponential of a number.
Parameter Types¶
x
int
orfloat
Return Type¶
float
Examples¶
Function Call | Result |
---|---|
exp(1.0) |
2.718281828459045 |
floor
¶
floor(x)
Description¶
floor
computes the largest integer not greater than its argument.
Parameter Types¶
x
int
orfloat
Return Type¶
same as input
The return type is float
for float
input in order to avoid problems with input values that are too large for the int
data type.
Examples¶
Function Call | Result |
---|---|
floor(1.3) |
1.0 |
floor(-1.7) |
-2.0 |
ln
¶
ln(x)
Description¶
ln
computes the natural logarithm of a number.
If the parameter is not strictly positive, NaN
is returned.
Parameter Types¶
x
int
orfloat
Return Type¶
float
Examples¶
Function Call | Result |
---|---|
ln(2) |
0.6931471805599453 |
log
¶
log(x)
log(b, x)
Description¶
log
computes the logarithm of a number x
to base b
(default: 10).
Parameter Types¶
x
int
orfloat
b
(optional)- same as
x
Return Type¶
float
Examples¶
Function Call | Result |
---|---|
log(100) |
2.0 |
log(2.5, 6.25) |
2.0 |
log(2, 8) |
3.0 |
mod
¶
mod(y, x)
Description¶
mod
computes the remainder of integer division y
/x
of two numbers y
and x
.
If x
is 0.0
(float) then NaN
will be returned; if it is 0
(integer) then a runtime error will occur.
Parameter Types¶
y
int
orfloat
x
- same as
y
Return Type¶
same as input
Examples¶
Function Call | Result |
---|---|
mod(9, 4) |
1 |
mod(9.3, 4.5) |
0.3 |
pi
¶
pi()
Description¶
pi
returns the π constant (more or less 3.14).
Return Type¶
float
Examples¶
Function Call | Result |
---|---|
pi() |
3.141592653589793 |
power
¶
power(a, b)
Description¶
power
computes a
raised to the power of b
.
Parameter Types¶
a
int
orfloat
b
- same as
a
Return Type¶
float
The return type is float
even for integer input in order to have a uniform behavior for cases such as power(2, -2)
.
Examples¶
Function Call | Result |
---|---|
power(9.0, 3.0) |
729.0 |
power(2, -1) |
0.5 |
radians
¶
radians(x)
Description¶
radians
converts degrees to radians.
Parameter Types¶
x
int
orfloat
Return Type¶
float
Examples¶
Function Call | Result |
---|---|
radians(180) |
3.141592653589793 |
round
¶
round(x)
Description¶
round
computes the nearest integer of a number.
Parameter Types¶
x
int
orfloat
Return Type¶
same as input
The return type is float
for float
input in order to avoid problems with input values that are too large for the int
data type.
Examples¶
Function Call | Result |
---|---|
round(1.3) |
1.0 |
round(0.5) |
1.0 |
round(-1.7) |
-2.0 |
sign
¶
sign(x)
Description¶
sign
returns the sign of a number: 1 for positive numbers, -1 for negative numbers and 0 for zero.
Parameter Types¶
x
int
orfloat
Return Type¶
int
Examples¶
Function Call | Result |
---|---|
sign(2) |
1 |
sqrt
¶
sqrt(x)
Description¶
sqrt
computes the square root of a number.
If the parameter is negative, NaN
is returned.
Parameter Types¶
x
int
orfloat
Return Type¶
float
Examples¶
Function Call | Result |
---|---|
sqrt(2) |
1.4142135623730951 |
trunc
¶
trunc(x)
Description¶
trunc
computes the truncated integer (towards zero) of a number.
Parameter Types¶
x
int
orfloat
Return Type¶
same as input
The return type is float
for float
input in order to avoid problems with input values that are too large for the int
data type.
Examples¶
Function Call | Result |
---|---|
trunc(1.3) |
1.0 |
trunc(-1.7) |
-1.0 |
width_bucket
¶
width_bucket(x, left, right, count)
Description¶
width_bucket
computes the bucket to which x
would be assigned in an equidepth histogram with count
buckets in the range \([\text{left},\text{right}[\).
Points on a bucket border belong to the right bucket.
Points outside of the \([\text{left},\text{right}[\) range have bucket number \(0\) and \(\text{count}+1\), respectively.
Parameter Types¶
x
int
orfloat
left
int
orfloat
right
int
orfloat
count
int
Return Type¶
int
Examples¶
Function Call | Result |
---|---|
width_bucket(5, 0, 10, 5) |
3 |
Pseudo-Random Functions¶
The characteristics of the functions below are equal to those from the Go rand module. They are not suitable for cryptographic applications.
random
¶
random()
Description¶
random
returns a pseudo-random number in the range \(0.0 <= x < 1.0\).
This function is not safe for use in cryptographic applications. See the Go math/rand package for details.
Return Type¶
float
Examples¶
Function Call | Result |
---|---|
random() |
0.6046602879796196 |
setseed
¶
setseed(x)
Description¶
setseed
initializes the seed for subsequent random()
calls.
The parameter must be in the range \(-1.0 <= x <= 1.0\).
This function is not safe for use in cryptographic applications. See the Go math/rand package for details.
Parameter Types¶
x
float
Trigonometric Functions¶
All trigonometric functions take arguments and return values of type float
.
Trigonometric functions arguments are expressed in radians.
Inverse functions return values are expressed in radians.
String Functions¶
bit_length
¶
bit_length(s)
Description¶
bit_length
computes the number of bits in a string s
.
Note that due to UTF-8 encoding, this is equal to octet_length(s) * 8
,
not necessarily char_length(s) * 8
.
Parameter Types¶
s
string
Return Type¶
int
Examples¶
Function Call | Result |
---|---|
bit_length("über") |
40 |
btrim
¶
btrim(s)
btrim(s, chars)
Description¶
btrim
removes the longest string consisting only of characters in chars
(default: whitespace) from the start and end of s
.
Parameter Types¶
s
string
chars
(optional)string
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
btrim(" trim ") |
"trim" |
btrim("xyxtrimyyx", "xy") |
"trim" |
char_length
¶
char_length(s)
Description¶
char_length
computes the number of characters in a string.
Parameter Types¶
s
string
Return Type¶
int
Examples¶
Function Call | Result |
---|---|
char_length("über") |
4 |
concat
¶
concat(s [, ...])
Description¶
concat
concatenates all strings given as input arguments.
NULL
values are ignored, i.e., treated like an empty string.
Parameter Types¶
s
and all subsequent parametersstring
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
concat("abc", NULL, "22") |
"abc22" |
concat_ws
¶
concat_ws(sep, s [, ...])
Description¶
concat_ws
concatenates all strings given as input arguments s
using the separator sep
.
NULL
values are ignored.
Parameter Types¶
sep
string
s
and all subsequent parametersstring
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
concat_ws(":", "abc", NULL, "22") |
"abc:22" |
format
¶
format(s, [x, ...])
Description¶
format
formats a variable number of arguments x
according to a format string s
.
See the Go package fmt for details of what formatting codes are allowed.
Parameter Types¶
s
string
x
and all subsequent parameters (optional)- any
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
format("%s-%d", "abc", 22) |
"abc-22" |
lower
¶
lower(s)
Description¶
lower
converts a string s
to lower case.
Non-ASCII Unicode characters are mapped to their lower case, too.
Parameter Types¶
s
string
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
lower("ÜBer") |
"über" |
ltrim
¶
ltrim(s)
ltrim(s, chars)
Description¶
ltrim
removes the longest string consisting only of characters in chars
(default: whitespace) from the start of s
.
Parameter Types¶
s
string
chars
(optional)string
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
ltrim(" trim ") |
"trim " |
ltrim("xyxtrimyyx", "xy") |
"trimyyx" |
md5
¶
md5(s)
Description¶
md5
computes the MD5 checksum of a string s
and returns it in hexadecimal format.
Parameter Types¶
s
string
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
md5("abc") |
"900150983cd24fb0d6963f7d28e17f72" |
octet_length
¶
octet_length(s)
Description¶
octet_length
computes the number of bytes in a string s
.
Note that due to UTF-8 encoding, this may differ from the number returned by char_length
.
Parameter Types¶
s
string
Return Type¶
int
Examples¶
Function Call | Result |
---|---|
octet_length("über") |
5 |
overlay
¶
overlay(s, repl, from)
overlay(s, repl, from, for)
Description¶
overlay
replaces for
characters in a string s
with the string repl
, starting at from
.
(Index counting starts at 0.)
If for
is not given, the length of repl
is used as a default.
Parameter Types¶
s
string
repl
string
from
int
for
(optional)int
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
overlay("Txxxxas", "hom", 1) |
"Thomxas" |
overlay("Txxxxas", "hom", 1, 4) |
"Thomas" |
rtrim
¶
rtrim(s)
rtrim(s, chars)
Description¶
rtrim
removes the longest string consisting only of characters in chars
(default: whitespace) from the end of s
.
Parameter Types¶
s
string
chars
(optional)string
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
rtrim(" trim ") |
" trim" |
rtrim("xyxtrimyyx", "xy") |
"xyxtrim" |
sha1
¶
sha1(s)
Description¶
sha1
computes the SHA1 checksum of a string s
and returns it in hexadecimal format.
Parameter Types¶
s
string
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
sha1("abc") |
"a9993e364706816aba3e25717850c26c9cd0d89d" |
sha256
¶
sha256(s)
Description¶
sha256
computes the SHA256 checksum of a string s
and returns it in hexadecimal format.
Parameter Types¶
s
string
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
sha256("abc") |
"ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad" |
strpos
¶
strpos(s, t)
Description¶
strpos
returns the index of the first occurrence of t
in s
(index counting starts at 0) or -1 if it is not found.
Parameter Types¶
s
string
t
string
Return Type¶
int
Examples¶
Function Call | Result |
---|---|
strpos("high", "ig") |
1 |
substring
¶
substring(s, r)
substring(s, from)
substring(s, from, for)
Description¶
substring(s, r)
extracts the substring matching regular expression r
from s
.
See the Go regexp package for details of matching.
substring(s, from, for)
returns the for
characters of str
starting from the from
index.
(Index counting starts at 0.)
If for
is not given, everything until the end of str
is returned.
Which of those behaviors is used depends on the type of the second parameter (int
or string
).
Parameter Types¶
s
string
r
string
from
int
for
(optional)int
Return Type¶
string
Examples¶
Function Call | Result |
---|---|
substring("Thomas", "...$") |
"mas" |
substring("Thomas", 1) |
"homas" |
substring("Thomas", 1, 3) |
"hom" |
Time Functions¶
distance_us
¶
distance_us(u, v)
Description¶
distance_us
computes the signed temporal distance from u
to v
in microseconds.
Parameter Types¶
u
timestamp
v
timestamp
Return Type¶
int
Examples¶
Function Call | Result |
---|---|
distance_us("2016-02-09T05:40:25.123Z"::timestamp, "2016-02-09T05:41:25.456Z"::timestamp) |
60333000 |
distance_us(clock_timestamp(), clock_timestamp()) |
2 |
clock_timestamp
¶
clock_timestamp()
Description¶
clock_timestamp
returns the current date and time in UTC.
Return Type¶
timestamp
now
¶
now()
Description¶
now
returns the date and time in UTC of the point in time when processing of the current tuple started.
In particular and as opposed to clock_timestamp
, the timestamp returned by now()
does not change during a processing run triggered by the arrival of a tuple.
For example, in
SELECT RSTREAM clock_timestamp() AS a, clock_timestamp() AS b,
now() AS c, now() AS d FROM ...
the values of a
and b
are most probably different by a very short timespan, but c
and d
are equal by definition of now()
.
now
cannot be used in an EVAL
statement outside of a stream processing context.
Return Type¶
timestamp
Array Functions¶
Other Scalar Functions¶
Aggregate Functions¶
Aggregate functions compute a single result from a set of input values.
It should be noted that except for count
, these functions return a NULL
value when no rows are selected.
In particular, sum
of no rows returns NULL
, not zero as one might expect, and array_agg
returns NULL
rather than an empty array when there are no input rows.
The coalesce
function can be used to substitute zero or an empty array for NULL
when necessary.
Also note that most aggregate functions ignore singular NULL
values in their input, i.e., processing is done as if this row had not been in the input.
(One notable exception is the array_agg
function that includes input NULL
values in its output.)
array_agg
¶
array_agg(x)
Description¶
array_agg
returns an array containing all input values, including NULL
values.
There is no guarantee on the order of items in the result.
Use the ORDER BY
clause to achieve a certain ordering.
Parameter Types¶
x
- any
Return Type¶
array
avg
¶
avg(x)
Description¶
avg
computes the average (arithmetic mean) of all input values.
Parameter Types¶
x
int
orfloat
(mixed types are allowed)
Return Type¶
float
bool_and
¶
bool_and(x)
Description¶
bool_and
returns true
if all input values are true, otherwise false
.
Parameter Types¶
x
bool
Return Type¶
bool
bool_or
¶
bool_or(x)
Description¶
bool_or
returns true
if at least one input value is true, otherwise false
.
Parameter Types¶
x
bool
Return Type¶
bool
count
¶
count(x)
count(*)
Description¶
count
returns the number of input rows for which x
is not NULL
, or the number of total rows if *
is passed.
Parameter Types¶
x
- any
Return Type¶
int
json_object_agg
¶
json_object_agg(k, v)
Description¶
json_object_agg
aggregates pairs of key k
and value v
as a map.
If both key and value are NULL
, the pair is ignored.
If only the value is NULL
, it is still added with the corresponding key.
It is an error if only the key is NULL
.
It is an error if a key appears multiple times.
A map does not have an ordering, therefore there is no guarantee on the result map ordering, whether or not ORDER BY
is used.
Parameter Types¶
k
string
v
- any
Return Type¶
map
max
¶
max(x)
Description¶
max
computes the maximum value of all input values.
Parameter Types¶
x
int
orfloat
(mixed types are allowed)
Return Type¶
same as largest input value
median
¶
median(x)
Description¶
median
computes the median of all input values.
Parameter Types¶
x
int
orfloat
(mixed types are allowed)
Return Type¶
float
min
¶
min(x)
Description¶
min
computes the minimum value of all input values.
Parameter Types¶
x
int
orfloat
(mixed types are allowed)
Return Type¶
same as smallest input value