It allows for data pipeline creation separate from execution, so instead of writing:
result = step3(step2(step1(data)))
#or
result_1 = step1(data)
result_2 = step2(result_1)
result_3 = step3(result_2)
You can write:
pipeline = step1 | step2 | step3
result = pipeline(data)
Well you may not need a new syntax, but the ability to compose a data pipeline before executing it does impart some advantages, such as:
- Reuse. Compose a pipeline once and use it in multiple places, including as part of larger pipelines:
# load, parse, clean, validate, and format are functions preprocess = load | parse | clean | validate | format # preprocess is now a MetaFunction, and can be reused clean_data1 = preprocess('path/to/data/file') clean_data2 = preprocess('path/to/different/file') # Preprocess can be included in larger pipelines pipeline = preprocess | step1 | step2 | step3
- Readability.
step1 | step2 | step3
is both read and executed from left to right, unlikestep3(step2(step1()))
, which is executed from innermost function outwards. - Inspection. Can't remember what your MetaFunction does?
str
will tell you:>>> str(preprocess) "(load | parse | clean | validate | format)"
- Advanced Composition. Anything beyond simple function chaining becomes difficult using traditional methods. What if you want to send the result of
step1
to both steps2
and3
, then sum the results? The traditional approach requires an intermediate variable and can quickly become unwieldy:Using metafunctions, you can declare a pipeline that does the same thing:result1 = step1(data) result2 = step2(result1) + step3(result1)
pipeline = step1 | step2 + step3 result = pipeline(data)
MetaFunctions supports python 3.5+ (tested to 3.10+)
pip install metafunctions
Conceptually, a MetaFunction is a function that contains other functions. When you call a MetaFunction, the MetaFunction calls the functions it contains.
You can create a MetaFunction using the node
decorator:
from metafunctions import node
@node
def get_name(prompt):
return input(prompt)
@node
def say_hello(name):
return 'Hello {}!'.format(name)
MetaFunctions override certain operators to allow for composition. For example, the following creates a new MetaFunction that combines get_name
and say_hello
:
greet = get_name | say_hello
When we call the greet
MetaFunction, it calls both its internal functions in turn.
# First, `get_name` is called, which prints our prompt to the screen.
# If we enter 'Tom' at the prompt, the second function returns the string 'Hello Tom!'
greeting = greet('Please enter your name ')
print(greeting) # Hello Tom!
MetaFunctions are also capable of upgrading regular functions to MetaFunctions at composition time, so we can simplify our example by composing say_hello
directly with the builtin input
and print
functions:
>>> greet = input | say_hello | print
>>> greet('Please enter your name: ')
# Please enter your name: Tom
# Hello Tom!
Errors in composed functions can be confusing. If an exception occurs in a MetaFunction, the exception traceback will tell you which function the exception occurred in. But what if that function appears multiple times in the data pipeline?
Imagine this function, which downloads stringified numeric data from a web api:
>>> compute_value = (query_volume | float) * (query_price | float)
>>> compute_value('http://prices.com/123')
Here we've assumed that query_volume
and query_price
will return strings that convert cleanly to floats, but what if something goes wrong?
>>> compute_value('http://prices.com/123')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
...
builtins.ValueError: could not convert string to float: '$800'
We can deduce that float conversion failed, but which float function raised the exception? MetaFunctions addresses this by adding the locate_error
metafunction decorator, which adds a location information string to any exception raised within the pipeline:
>>> from metafunctions import locate_error
>>> with_location = locate_error(compute_value)
>>> with_location('http://prices.com/123')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
...
builtins.ValueError: could not convert string to float: '$800'
Occured in the following function: ((query_volume | float) + (query_price | ->float<-))
Metafunctions provides utilities for constructing advanced function pipelines.
-
store / recall: store the output of the preceding function, and recall it later to pass to a different function. For example:
# The following pipeline sends the output of `a` to `b`, and also adds it to the output of `c` p = a | store('a') | b | recall('a') + c
-
mmap: A MetaFunction decorator that wraps a function (or MetaFunction) and calls it once per item in the input it receives. This allows you to create loops in function pipelines:
(1, 2, 3) | mmap(process) # <- equivalent to (1, 2, 3) | (process & process & process)
mmap
duplicates the behaviour of the builtinmap
function. -
star: Calls the wrapped MetaFunction with *args instead of args (It's analogous to
lambda args, **kwargs: metafunction(*args, **kwargs)
). This allows you to incorporate functions that accept more than one parameter into your function pipeline:@node def f(result1, result2): ... # When cmp is called, f will receive the results of both a and b as positional args cmp = (a & b) | star(f)
star
can be combined with the abovemmap
to duplicate the behaviour ofitertools.starmap
:starmap = star(mmap(f))
For more discussion of
star
, see this pull request -
concurrent: experimental, requires an os that provides
os.fork()
Consider the following long running MetaFunction:
process_companies = get_company_data | filter | process process_customers = get_customer_data | filter | process process_all = process_companies & process_customers
Assuming the component functions in the
process_all
MetaFunction follow good functional practices and do not have side effects, it's easy to see thatprocess_companies
andprocess_customers
are independent of each other. If that's the case, we can safely execute them in parallel. Theconcurrent
metafunction decorator allows you to specify steps in the function pipeline to execute in parallel:from metafunctions import concurrent do_large_calculation_async = concurrent(process_companies + process_customers)
concurrent
can be combined withmmap
to create an asynchronous map, similar tomultiprocessing.pool.map
:map_async = concurrent(mmap(f))