Wallaroo v0.6.1 Release Notes
Release Date: 2018-12-31 // over 4 years ago-
π Release Notes
π Weβre excited to announce the addition of stream windowing to the Wallaroo API. You can now aggregate data in count or range-based windows. For example, the following code sample defines 6-second sliding windows starting every 3 seconds for a user-defined aggregation:
(inputs .to(wallaroo.range\_windows(wallaroo.seconds(6)) .with\_slide(wallaroo.seconds(3)) .over(MyAgg)) .to\_sink(sink\_config))
π This means first-class support for the wide range of streaming use cases that require windowing.
π We've also updated our documentation site! If you happen to run into any problems when navigating the new site, feel free to open an issue!
What is Wallaroo
Wallaroo is a modern, extensible framework that makes it simple to get stateful streaming data and event-driven applications to production fast, regardless of scale.
π If you are interested in installing Wallaroo, our installation documentation provides the various ways you can get up and running.
π Feel free to use the table of contents below to help you navigate to sections you might find relevant.
Table of Contents
- New Features and Improvements
- Aggregations
- Windowing API
- API Update: collect()
- Installing Wallaroo
β¬οΈ 3. Upgrading Wallaroo
- Upgrading Wallaroo via Wallaroo Up
- Upgrading Wallaroo in Docker
- Upgrading Wallaroo in Vagrant
- CHANGELOG
π New Features and Improvements
Aggregations
π Aggregations are an alternative to state computations that trade some of the freedom provided by state computations for the ability to efficiently compute results in windows. This goes hand-in-hand with our new Windowing API, which is also part of this release.
Windowing API
π Wallaroo now supports windowing over aggregations. This allows you to break an infinite stream into manageable chunks and also allows you to see how your inputs evolve over time.
π There are two broad categories of windows currently supported: count-based and range-based. Count-based windows emit an output everyn
input messages, wheren
is specified via the API. Range-based windows are based on fixed time ranges, and can be further divided into tumbling and sliding (overlapping) varieties. See our documentation for an in-depth description of these concepts and our API.β‘οΈ API Update: collect()
We pass an implicit routing key along each stage in a Wallaroo pipeline. This release adds a new API call ".collect()" for the case where you want all messages after a certain stage to be assigned the same routing key. Prior to this change, you had to use a
key_by
call that used a constantkey_extractor
function. This was onerous and also limited our options to optimize topology layouts under the hood.Installing Wallaroo
There are currently four ways that you can install Wallaroo:
π The Wallaroo Up script
π³ Docker
π Vagrantβ¬οΈ Upgrading Wallaroo
In all cases below, if you run into issues, please reach out to us! Weβre available on twitter, IRC, Github, by email, our mailing list, or our subreddit. We love questions!
π If you have made no changes to Wallaroo or Pony since installation, your best bet will be to start from scratch, following the instructions of your choice.
β¬οΈ Below are instructions for Upgrading Wallaroo via Wallaroo Up, Upgrading Wallaroo in Docker, and Upgrading Wallaroo in Vagrant.
β¬οΈ Upgrading Wallaroo via Wallaroo Up
π The normal Wallaroo Up installation instructions will install new versions next to existing versions.
β¬οΈ Upgrading the Wallaroo Docker image
β¬οΈ To upgrade the Wallaroo Docker image, run the following command to get the latest image. If you don't allow a non-root user to run Docker commands, you'll need to add
sudo
to the front of the command.docker pull wallaroo-labs-docker-wallaroolabs.bintray.io/release/wallaroo:0.6.1
β¬οΈ Upgrading Wallaroo Source Code
π If you mounted the Wallaroo source code to your local machine using the directory recommended in setup, in
/tmp/wallaroo-docker
(UNIX & MacOS users) orc:/wallaroo-docker
(Windows users), then you will need to move the existing directory in order to get the latest source code. The latest Wallaroo source code will be copied to this directory automatically when a new container is started with the latest Docker image.π UNIX & MacOS Users
π For UNIX users, you can move the directory with the following command:
mv /tmp/wallaroo-docker/wallaroo-src/ /tmp/wallaroo-docker/wallaroo-0.6.0-src/
π Windows Users
π For Windows users, you can move the directory with the following command:
move c:/wallaroo-docker/wallaroo-src/ c:/wallaroo-docker/wallaroo-0.6.0-src
Once done moving, you can re-create the
wallaroo-src
directory with the following command:mkdir c:\wallaroo-docker\wallaroo-src
β¬οΈ Upgrading Wallaroo in Vagrant
β¬οΈ To upgrade your Wallaroo installation in Vagrant, youβll want to follow the latest installation instructions for Wallaroo in Vagrant.
If you have modified your old Vagrant VM in any way that you intend to persist, you should persist your changes now. For example, copy any edited or new files from the old Vagrant VM to the new one.
π CHANGELOG
[0.6.1] - 2018-12-31
β Added
- β Add support for windowing (PR #2735)
- New Features and Improvements
Previous changes from v0.6.0
-
π Release Notes
π We are excited to announce the release of Wallaroo 0.6.0. The most significant change in this release is a complete overhaul of the Wallaroo API to make it cleaner, simpler, and more intuitive. As a result of these changes, this is a breaking release. We also want to thank Github users ChristianWitts and cristaloleg for their contributions to Wallaroo last month!
We would love to hear what you think of the new API and how you plan to use Wallaroo. Please reach out to us! Weβre available on Twitter, IRC, GitHub, by email, our mailing list, or our subreddit. We love questions!
What is Wallaroo
Wallaroo is a modern, extensible framework that makes it simple to get stateful streaming data and event-driven applications to production fast, regardless of scale.
π If you are interested in installing Wallaroo, our installation documentation provides the various ways you can get up and running.
π Feel free to use the table of contents below to help you navigate to sections you might find relevant.
Table of Contents
- New Features and Improvements
π Python 3 Support for Connectors
β‘οΈ Connectors API Update
Streamlined Wallaroo Python API - Converting to the New API
- Installing Wallaroo
β¬οΈ 4. Upgrading Wallaroo
- Upgrading Wallaroo via Wallaroo Up
- Upgrading Wallaroo in Docker
- Upgrading Wallaroo in Vagrant
- Upgrading Wallaroo when compiled from source
- CHANGELOG
π New Features and Improvements
π Python 3 Support for Connectors
β‘οΈ The Connectors API has been updated to work with Python 3.5 and up, and all of the example connectors have been tested against Python 3.5. Prior to this work Connectors would only work under Python 2.7.
β‘οΈ Connectors API Update
Weβve made some changes to the connectors API when defining applications and pipelines to bring it more in line with how other built-in sources and sinks are defined. These changes only impact your
application_setup
code and should not require code changes in the connector scripts.π An example application has been updated in this release and the documentation includes all relevant details if youβre getting started. Youβre encouraged to keep reading the section on the streamlined API below as this is all relevant to how your application code should be updated. For quick reference, the new source and sink configuration constructors look like this:
source\_config = wallaroo.experimental.SourceConnectorConfig( "source\_name", encoder=source\_encode\_function, decoder=source\_decode\_function, port=7100) sink\_config = wallaroo.experimental.SinkConnectorConfig( "sink\_name", encoder=sink\_encode\_function, decoder=sink\_decode\_function, port=7200)
π§ As before, the names must match what you pass to the connector scripts so the right data flows to each part. Ports are now assigned explicitly and should be unique for each connector. These configuration values can be passed into source and sink pipeline components respectively. Read on for more information on how to use the streamlined pipeline components.
Streamlined Wallaroo Python API
π The original Wallaroo Python API has existed in roughly the same form since September 2017. Based on user feedback and continuous internal experimentation, we decided it was time to streamline the API both to create a better developer experience and to allow us to more easily add functionality to Wallaroo in the future. Weβre going to describe the new API in isolation in this section. If you want to know how to convert from the old to the new API, see here.
Defining a Simple Wallaroo Application
A Wallaroo application includes one or more sources. You use
wallaroo.source(...)
to define a stream originating from a source. Each source stream can be followed by one or more computation stages (we will describe how to define computations themselves later on). A linear sequence from a source through zero or more computations constitutes a partial pipeline. For example:inputs = wallaroo.source("Source Name", source\_config) partial\_pipeline = inputs.to(my\_computation)
This defines a partial pipeline that could be diagrammed as followed:
Source -> my_computation ->
The hanging arrow at the end of this diagram indicates that the pipeline is partial. We can still add more stages, and to complete the pipeline we need one or more sinks.
You create a complete pipeline by terminating a partial pipeline with a call toto_sink
orto_sinks
. For example:inputs = wallaroo.source("Source Name", source\_config) complete\_pipeline = (inputs .to(my\_computation) .to\_sink(sink\_config))
Our pipeline is now complete:
Source -> my_computation -> Sink
Unless a call to
to
using a stateless computation is preceded by a call tokey_by
(which partitions messages by key), there are no guarantees around the order in which messages will be processed. That's because Wallaroo might parallelize a stateless computation if that is beneficial for scaling. That means the execution graph for the above pipeline could look like this:/-> my_stateless_computation -\ / \ Source ----> my_stateless_computation ----> Sink \ / \-> my_stateless_computation -/
π Some messages will be routed to each of the parallel computation instances. When they merge again at the sink, these messages will be interleaved in a non-deterministic fashion.
π Merging Partial Pipelines
π You can merge two partial pipelines to form a new partial pipeline. For example:
inputs1 = wallaroo.source("Source 1", source\_config) partial\_pipeline1 = inputs1.to(computation1) inputs2 = wallaroo.source("Source 2", source\_config) partial\_pipeline2 = inputs2.to(computation2) partial\_pipeline = inputs1.merge(inputs2)
The resulting partial pipeline could be
Source1 -> computation1 ->\ \ -> / Source2 -> computation2 ->/
π Again, the hanging arrow indicates we can still add more stages, and that to complete the pipeline we still need one or more sinks. You could also merge this partial pipeline with additional partial pipelines. When you merge partial pipelines in this way, you are not creating a join in the sense familiar from SQL joins. Instead, you are combining two streams into one, with messages from the first stream interwoven with messages from the second. That combined stream is then passed to the next stage following the hanging arrow.
π The following is an example of a complete pipeline including a merge where we first add one more computation before the sink:
pipeline = (inputs1.merge(inputs2) .to(computation3) .to\_sink(sink\_config))
The corresponding diagram for this definition would look like this:
Source1 -> computation1 ->\ \ -> computation3 -> Sink / Source2 -> computation2 ->/
π Building an Application
Once you have defined a complete pipeline, you must pass it into
wallaroo.build_application(app_name, pipeline)
in order to build the application object you must return from theapplication_setup
function.For a simple application with a decoder, computation, and encoder, the
application_setup
function might look likedef application\_setup(args): inputs = wallaroo.source("Source Name", source\_config) pipeline = (inputs .to(computation) .to\_sink(sink\_config)) return wallaroo.build\_application("Application Name", pipeline)
Defining Computations
There are two types of computations that can be added to a Wallaroo pipeline: stateless and state computations. The API for stateless computation has not changed, so we will only discuss state computations here.
β‘οΈ A state computation takes an input message and a state object, does some work which might involve updating that state, and then optionally returns an output that will be sent downstream. Here is an example of a simple state computation taken from our Word Count example:
class WordTotal(object): def \_\_init\_\_(self): self.count = [email protected]\_computation(name="count word", state=WordTotal)def count\_word(word, word\_total): word\_total.count = word\_total.count + 1return WordCount(word, word\_total.count)
The
count_word
function takes an input calledword
and the state representing the running total calledword_total
. We specify the associated state class by passingWordTotal
as the decorator argumentstate
. In this case, the function always returns an outputWordCount
object (defined elsewhere).π To add this state computation to a Wallaroo pipeline, we simply add the following to our pipeline definition (see above for a description of how to define a complete pipeline):
.to(count\_word)
If you want to partition this state by the word that is being counted, then you must define a key_extractor function:
@wallaroo.key\_extractordef extract\_word(word): return word
You must then precede the
to
call for our state computation with akey_by
call using the key_extractor, as in the following example:.key\_by(extract\_word) .to(count\_word)
Converting to the New API
Converting an old Wallaroo application to the new API should be a relatively straightforward process. This section will take you through what is required.
Defining an Application Source
In the old API, applications were defined using an
ApplicationBuilder
object. This is no longer a part of the API. Instead, you begin by defining a source and one or more stages immediately following the source. The old Word Count application began this way:def application_setup(args): in_host, in_port = wallaroo.tcp_parse_input_addrs(args)[0] out_host, out_port = wallaroo.tcp_parse_output_addrs(args)[0] ab = wallaroo.ApplicationBuilder("Word Count Application") ab.new_pipeline("Split and Count", wallaroo.TCPSourceConfig(in_host, in_port, decoder))
The new version begins like this:
def application\_setup(args): in\_host, in\_port = wallaroo.tcp\_parse\_input\_addrs(args)[0] out\_host, out\_port = wallaroo.tcp\_parse\_output\_addrs(args)[0] lines = wallaroo.source("Split and Count", wallaroo.TCPSourceConfig(in\_host, in\_port, decode\_lines))
Both require an entry point function called
application_setup
and some setup around the TCP source and sink addresses. In the old example, we create anApplicationBuilder
and add a pipeline to it. In the new version, we forego theApplicationBuilder
and instead define a source stream usingwallaroo.source()
.β Adding a Stateless Computation Stage
For Word Count, the first computation splits the incoming lines into individual words. The definition of stateless computations themselves is the same across APIs, but what has changed is how they are added to the pipeline definition. In the old API, you had to choose between
to()
andto_parallel()
, depending on whether the computation was going to be parallelized. However, one of the design goals of Wallaroo is to provide a scale-independent API that allows you to focus on your business logic. We decided that this choice betweento
andto_parallel
represented a conflation of concerns. In the new API, all computations are potentially parallelized, and there is only one corresponding call,to()
.With the old API, we had:
ab = wallaroo.ApplicationBuilder("Word Count Application") ab.new\_pipeline("Split and Count", wallaroo.TCPSourceConfig(in\_host, in\_port, decoder)) ab.to\_parallel(split)
With the new:
lines = wallaroo.source("Split and Count", wallaroo.TCPSourceConfig(in\_host, in\_port, decode\_lines)) pipeline = (lines .to(split))
This brings us to another difference. In the old API, each API call mutated the
ApplicationBuilder
. In the new API, a call toto()
returns a new immutable pipeline object. You can chain many of these calls together to define a more complex pipeline.β Adding a State Computation Stage
The next stage in the Word Count application involves state: this is where we keep a running total for each word. In the old API, this looked like:
ab.to\_state\_partition(count\_word, WordTotals, "word totals", partition, word\_partitions)
You passed in the state computation, the corresponding state class, a string uniquely identifying that state, a partition function for determining how to partition state, and a list of initial partitions. With the new API, you would do the following instead:
.key\_by(extract\_word) .to(count\_word)
First of all, notice that the partitioning by key is no longer conflated with the definition of the state computation stage itself. If you remove the
key_by
call, then all messages will be sent to the same state (corresponding to the formerto_stateful
call). Second, notice that you no longer assign the state a unique string and you no longer provide an initial list of partitions.There are a couple of related differences. First, in the old API, you defined a partition function as follows:
@wallaroo.partitiondef partition(word): if word[0] \>= "a" and word[0] \<= "z": return word[0] else: return "!"
In the old version, we used the first letter of the word as our partitioning key because if we had used words themselves as keys, Wallaroo would have created a new actor to handle every word. In the new version of Wallaroo, we use two levels of hashing when partitioning so that we can reduce the numbers of actors required to handle large sets of keys. The new version uses the following function instead:
@wallaroo.key\_extractordef extract\_word(word): return word
We have renamed the decorator from
partition
tokey_extractor
to more closely describe what the function is actually doing.The last point we need to explain is the difference between the old and new state computation definition itself. Letβs take a look at the old version:
@wallaroo.state_computation(name="Count Word") def count_word(word, word_totals): word_totals.update(word) return (word_totals.get_count(word), True)
β‘οΈ The state computation takes an input (here itβs
word
) and our state object (hereword_totals
). It then returns a tuple. The first element of the tuple is either an optional output orNone
if there is no output. The second element is a Boolean telling Wallaroo whether state was updated. In the new API, we no longer return this Boolean. Instead, the state computation can either return an output orNone
, and thatβs it. Hereβs the new example:@wallaroo.state\_computation(name="count word", state=WordTotal)def count\_word(word, word\_total): word\_total.count = word\_total.count + 1return WordCount(word, word\_total.count)
The other difference to note is that instead of passing the state class (
WordTotal
) into the pipeline definitionto()
call, we pass it in as an argument to the state computation decorator (state=WordTotal
).Applications with Multiple Sources
In the old API, defining an application involved adding one or more pipelines to an
ApplicationBuilder
using thenew_pipeline
call. These pipelines intersected by referring to the same state name string in ato_state_partition
call. So, for example, in our Market Spread example, we have two sources of data. The first is a stream of orders that are checked against market data and then potentially generate rejection alerts that are sent to a sink. The second is a stream of market data updates that are used to keep our market data state up to date. The definition of the application looks like this:ab = wallaroo.ApplicationBuilder("market-spread") ab.new\_pipeline( "Orders", wallaroo.TCPSourceConfig(order\_host, order\_port, order\_decoder) ).to\_state\_partition( check\_order, SymbolData, "symbol-data", symbol\_partition\_function, symbol\_partitions ).to\_sink(wallaroo.TCPSinkConfig(out\_host, out\_port, order\_result\_encoder) ).new\_pipeline( "Market Data", wallaroo.TCPSourceConfig(nbbo\_host, nbbo\_port, market\_data\_decoder) ).to\_state\_partition( update\_market\_data, SymbolData, "symbol-data", symbol\_partition\_function, symbol\_partitions ).done()
Because you passed
βsymbol-dataβ
as a state name to theto_state_partition
call in each pipeline definition, these pipelines would intersect there. However, this was error-prone and involved some unexpected behavior, the most notable of which is that the state partition would only use the config information provided in one of the twoto_state_partition
calls.With the new API, you would do the same thing as follows:
orders = wallaroo.source("Orders", wallaroo.TCPSourceConfig(order\_host, order\_port, decode\_order)) market\_data = wallaroo.source("Market Data", wallaroo.TCPSourceConfig(nbbo\_host, nbbo\_port, decode\_market\_data)) pipeline = (orders.merge(market\_data) .key\_by(extract\_symbol) .to(check\_market\_data) .to\_sink(wallaroo.TCPSinkConfig(out\_host, out\_port, encode\_order\_result)))
π The
orders
andmarket_data
source streams are first defined separately. Then they are merged together via themerge()
call (see above for more detail about how this works).π Building an Application
Finally, once we have defined an application, we must return it so that Wallaroo can transform it into an execution graph in a running cluster. The old way of doing this was via another call to the
ApplicationBuilder
:return ab.build()
π Since we no longer use an
ApplicationBuilder
, we do the following instead, passing the name of the application and the pipeline we defined intowallaroo.build_application()
:return wallaroo.build\_application("Word Count Application", pipeline)
Installing Wallaroo
There are currently four ways that you can install Wallaroo:
π³ Docker
π Vagrant
π The Wallaroo Up script
π From Sourceβ¬οΈ Upgrading Wallaroo
In all cases below, if you run into issues, please reach out to us! Weβre available on twitter, IRC, Github, by email, our mailing list, or our subreddit. We love questions!
π If you have made no changes to Wallaroo or Pony since installation, your best bet will be to start from scratch, following the instructions of your choice.
β¬οΈ Below are instructions for Upgrading Wallaroo via Wallaroo Up, Upgrading Wallaroo in Docker, Upgrading Wallaroo in Vagrant, and Upgrading Wallaroo when compiled from source.
β¬οΈ Upgrading Wallaroo via Wallaroo Up
π The normal Wallaroo Up installation instructions will install new versions next to existing versions.
β¬οΈ Upgrading the Wallaroo Docker image
β¬οΈ To upgrade the Wallaroo Docker image, run the following command to get the latest image. If you don't allow a non-root user to run Docker commands, you'll need to add
sudo
to the front of the command.docker pull wallaroo-labs-docker-wallaroolabs.bintray.io/release/wallaroo:0.6.0
β¬οΈ Upgrading Wallaroo Source Code
π If you mounted the Wallaroo source code to your local machine using the directory recommended in setup, in
/tmp/wallaroo-docker
(UNIX & MacOS users) orc:/wallaroo-docker
(Windows users), then you will need to move the existing directory in order to get the latest source code. The latest Wallaroo source code will be copied to this directory automatically when a new container is started with the latest Docker image.π UNIX & MacOS Users
π For UNIX users, you can move the directory with the following command:
mv /tmp/wallaroo-docker/wallaroo-src/ /tmp/wallaroo-docker/wallaroo-0.5.4-src/
π Windows Users
π For Windows users, you can move the directory with the following command:
move c:/wallaroo-docker/wallaroo-src/ c:/wallaroo-docker/wallaroo-0.5.4-src
Once done moving, you can re-create the
wallaroo-src
directory with the following command:mkdir c:\wallaroo-docker\wallaroo-src
β¬οΈ Upgrading Wallaroo in Vagrant
β¬οΈ To upgrade your Wallaroo installation in Vagrant, youβll want to follow the latest installation instructions for Wallaroo in Vagrant.
If you have modified your old Vagrant VM in any way that you intend to persist, you should persist your changes now. For example, copy any edited or new files from the old Vagrant VM to the new one.
β¬οΈ Upgrading Wallaroo when compiled from source
π§ These instructions are for Ubuntu Linux. It's assumed that if you are using a different operating system then you are able to translate these instructions to your OS of choice.
β¬οΈ Upgrading ponyc to 0.25.0
β¬οΈ
ponyc
can be upgraded with the following command:sudo apt-get install --only-upgrade ponyc=0.25.0
π Verify you are now on the correct version of ponyc by running:
ponyc --version
You should get the following output:
0.25.0 [release]
β¬οΈ How to Upgrade Wallaroo
β Once you're on the latest ponyc and pony stable, you're ready to switch over to Wallaroo 0.6.0.
π If you have made prior changes to the Wallaroo code, youβll need to re-implement those changes. To get the latest release, assuming that you previously installed to the directory we recommended in setup, youβll need to run the following:
cd ~/wallaroo-tutorial/
To get a new copy of the Wallaroo repository, run the following commands:
cd ~/wallaroo-tutorial/ curl -L -o wallaroo-0.6.0.tar.gz 'https://wallaroo-labs.bintray.com/wallaroolabs-ftp/wallaroo/0.6.0/wallaroo-0.6.0.tar.gz'mkdir wallaroo-0.6.0 tar -C wallaroo-0.6.0 --strip-components=1 -xzf wallaroo-0.6.0.tar.gz rm wallaroo-0.6.0.tar.gzcd wallaroo-0.6.0
π You can then run the following commands to build the necessary tools to continue developing using Wallaroo 0.6.0:
cd ~/wallaroo-tutorial/wallaroo-0.6.0 make build-machida build-machida3 build-giles-all build-utils-cluster\_shutdown
π CHANGELOG
[0.6.0] - 2018-11-30
π Fixed
- Gradually back off when attempting to reconnect on data channel
- π· There is no longer a problem when using more workers than there are partitions
- π No longer treat state computation stages as a special case. This results in fewer allocations and better performance
β Added
- π Python 3 Support for Connectors
- β Add parallel stateless steps to joining workers
π Changed
- Streamlined Wallaroo Python API
- β‘οΈ Connectors API Update
- Simplify the Python API for adding a computation to a pipeline
- New Features and Improvements