-
Notifications
You must be signed in to change notification settings - Fork 1
Flows and Tasks
This document describes the capabilities of flow-creation on the sensezilla framework.
A task is one step in transforming the raw data from sensors to the desired output. The importance of dividing work into tasks is twofold:
- First, we can re-use intermediate results between flows or within the same flow. For instance, if several flows use the same smoothing algorithm on the raw data, then we don't have to run in twice. Therefore, when designing flows, we should be sure to do as many common operations as possible.
- Second, debugging will be easier and faster as intermediate results are intrinsically saved between steps. Therefore, we can plot the evolution of the data after each step.
Although there are these benefits to splitting up a task into smaller parts, if there isn't a reason to, its best to keep things as a monolithic task, as each task incurs some processing and memory overhead.
Any program that can be run on a Linux command line can be treated as a task provided it does the following:
-
It's input and output files are provided on the command-line. To be generally compatible, the input and outputs should be CSV with Unix timestamp in the first column, and values in the second column. However, the program doesn't check the formats, so other configurations are possible.
-
It outputs its status to the console in the form:
PROGRESS STEP <current_step> OF <num_steps> "<description of step>" <progress within step> DONEe.g. PROGRESS STEP 2 OF 5 "Detecting Transitions" 23% DONE -
It does not hold any additional state besides what is passed to it via the input.
-
It can be operated fully by the command-line and requires no other interaction.
-
If and only if the program successfully runs and creates its output files, it has a return code of 0. This is important, as errors must be caught to prevent other tasks which depend on this task from running.
A .flow file describes a set of tasks that could be executed on a single stream of data. A stream is the tuple (source name, source id). See Sources for more info.
The .flow file is processed line-by-line. The following describes what you can do on each line:
Comments which are ignored:
# Add a comment by prepending with a '#' character. The rest of the line is ignored
Include external files ( assume they are "pasted" into the file ):
include "cool-tasks.inc"
Set a key to a value:
varname = value
Or create a list of values for a key:
varname @= value
Or append to a list of values (creates an new list if it doesn't already exist):
varname @+ value
For all of the above, double quotes can be put around the value (for example, if it has spaces). Important: Space on either side of '=', '@=', or '@+' is required.
You can also add substitutions of static values (these are replaced at .flow file loading):
my_base_directory = "/tmp/base"
my_sub_directory = $(my_base_directory)/sub1
There are also substitutions of dynamic values which vary per each flow instantiation and replaced when a flow is run:
analyse.py --from %{TIME_FROM} --to %{TIME_TO} %{SOURCE} ${DEVICE}
Static values : Anything loaded by the master configuration file (i.e. server.conf) and in the "global" module name, in addition to some pre-provided values. Of note:
| root_dir | Root directory of python code, i.e. what you get when you checkout this repo |
| flow_dir | Contains .flow files and those you might want to include |
| bin_dir | Contains the Testing Tools |
| cpp_bin (from local.conf) | Contains the location of the sensezilla-cpp project's binary output directory containing symlinks to the cpp programs. |
Dynamic Values : The following substitutions are available:
| %{TIME_FROM} | UNIX timestamp of the beginning of the time interval to process |
| %{TIME_TO} | UNIX timestamp of the end of the time interval to process |
| %{SOURCE} | Name of the source to process from |
| %{DEVICE} | Name of the device identifier (e.g. UUID) to process from |
| %{SOURCE.xxx} | Retrieves the variable 'xxx' from the source definition (.src) file. e.g. %{SOURCE.intr} or %{SOURCE.invr} |
Suppose we have a program we want to run on raw data retrieved by the "fetcher" task. The command-line of the program is described as:
my_filter <input file> <output file>
To create the task we first add it to the list of tasks:
tasks @+ my_filter-task
Now we should define the command to run for the task:
my_filter-task.cmd = "my_filter %I0 %O0"
Here, %I0 and %O0 indicate the program's input and output files. Up to 10 inputs (%I0-%I9) and outputs (%O0-%O9) can be given or taken from a program. You can also specify input and output directories by %I0D-%I9D and %O0D-%O9D.
We can also give the task a profile to use (default is cpu_bound). Other profiles are io_bound and highmem for network-bound and high memory usage processes.
my_filter-task.profile = cpu_bound
Now we would like to connect the input of my_filter-task to the output of the common-fetcher task. First we should load the common-fetcher task definition from the common library:
include "common.inc"
tasks @+ common-fetcher
Now, we connect the first and only output of the fetcher task to the first and only input of the my_filter-task:
my_filter-task.I0 = common-fetcher.O0
Now, we should add the output of my_filter-task to be an output of the flow as a whole. This is important since the program interpreting the flow will prune out any intermediate steps that don't contribute to producing the output of the flow.
outputs @+ my_filter-task.O0
There is another line which so far does nothing, but could come in handy later. This says the flow is intended to be used on timeseries:
source_types @= TIMESERIES
The file "my_filter.flow" is now:
tasks @+ my_filter-task
my_filter-task.cmd = "my_filter %I0 %O0"
my_filter-task.profile = cpu_bound
include "$(flow_dir)/common.inc"
tasks @+ common-fetcher
my_filter-task.I0 = common-fetcher.O0
outputs @+ my_filter-task.O0
source_types @= TIMESERIES
We can test the flow file using the run_flow.py program.
run_flow.py run --pretend --local --from -5h my_filter.flow openbms 8fbe97ef-37ba-5b5b-ba52-d1f643b34045
Here, openbms is the source name and 8fbe97ef-37ba-5b5b-ba52-d1f643b34045 is the source id. --local says to run the commands serially in the current console window (vs. handing them off the the scheduler) and --pretend says to show what commands would be run, but not actually execute them. --from -5h says to process the last 5 hours of data.
Assuming your python environment is set up correctly, the output should be:
Created file : testing_my_filter_my_filter-task.O0
Created file : testing_my_filter_common-fetcher.O0
Executing python /mnt/DATA/documents/UCB/Singapore/sensezilla-python/bin/fetcher.py fetch --from 1342820327 --to 1343425127 openbms 8fbe97ef-37ba-5b5b-ba52-d1f643b34045 testing_my_filter_common-fetcher.O0
Executing my_filter testing_my_filter_common-fetcher.O0 testing_my_filter_my_filter-task.O0
You can see that two temporary files were created corresponding to the outputs of the tasks. If you were to not use --pretend the commands would actually be run.
#includes common tasks defs such as 'common-fetcher' and 'common-minpeakfilter'
include "common.inc"
use_tmp = 1
source_types @= TIMESERIES
tasks @= common-fetcher
tasks @+ common-minfilter
tasks @+ libbuilder
libbuilder.profile = cpu_bound
libbuilder.cmd = "env LD_LIBRARY_PATH=$(cpp_bin) $(cpp_bin)/library_builder -csvin %I0 -outdir %O0D -outprec 13"
# connect inputs and outputs
common-minfilter.I0 = common-fetcher.O0
libbuilder.I0 = common-minfilter.O0
outputs @= libbuilder.O0
#outputs @= common-minfilter.O0