Domain Specific Language
Note: none of this is actually implemented yet. The purpose of this page is, at the moment, to demonstrate how we anticipicate developers will (eventually) use Maru once it becomes a general computational platform. Most, if not all of this will likely change in the future.
Maru developers will be able to program their operators with Jutsu (name tentative), our DSL. Jutsu compiles down to a set of STARK-VM binaries (one for each operator) and a description file that specifies their inputs, how to chain them together, and how to graft them onto the existing dataflow graph. These bytecodes and description files will then be uploaded to an "operator registry", from which the protocol will update its view of the dataflow and execute it.
Here's some examples of how we expect Jutsu programs to look:
Unary map
Let's say the user wanted to write an operator that takes a collection of ratios expressed as 64-bit fixed-point decimal numbers (expressed as u64
s) and turn them into percentages. They can define a function and then use the map
operator to convert the prices.
// They might write it like this
fn to_pct(ratios: u64): u64 {
return ratios * 100
}
// or like this
fn to_pct = (ratios: u64) => ratios * 100
Binary map
Let's say the user wanted to write an operator that takes a collection of NFT prices in wei and convert them to USDC given the current exchange rate (a single-element collection). They can "merge" the two collections into a single collection of tuples using the zip
operator. Depending on what the two streams in question are, zip
may be implemented differently under the hood by the compiler. In this case, the zip
can be elided away completely and pushed as an input directly into the underlying VM of the map
operator.
// They might write it like this:
fn nft_price_to_eth((meta, price): (NFTMeta, u256), exchange_rate: u256): u256 {
let converted_price = price * exchange_rate;
(meta, converted_price)
}
// or like this
fn nft_price_to_eth((meta, price): (NFTMeta, u256), exchange_rate: u256): u256 => (meta, price * exchange_rate)
// then, they can build the dataflow and `export` the output collection:
export collection nfts_priced_in_eth = @input("PRICE_STREAM_ID")
.zip("EXCHANGE_RATE_STREAM_ID")
.map(nft_price_to_eth)
Filter for all of the prices for ETH-USDC swaps on Uniswap
Let's say the user wanted to write an operator that produces a collection of prices for every ETH-USDC swap on Uniswap given a collection of every Ethereum log ever. They can do this using the filter
operator.
const USDC_ETH_SWAP_LOG_SIGNATURE = "CONTRACT_LOG_SIGNATURE"
export collection filtered_logs = @input("ETHEREUM_LOGS")
.filter(log => log.contract_address == USDC_ETH_SWAP_LOG_SIGNATURE)
export collection prices = filtered_logs.map(log => log.price)
Compute a 24-hour block-by-block moving average of the ETH-USDC pair on Uniswap
Let's say the user wants to compute a 24-hour (~7200 Blocks) moving average over the ETH-USDC pair on uniswap. They can do this with the following steps:
- use the ETH-USDC swap logs from the previous example as input
map
the logs to(block_number, price)
tuples- in parallel, do the following:
- sum the prices for all of the swaps in each block
- count the number of swaps in each block
zip
two the outputs of the previous step togethermap
the(sum, count)
pairs to averages (i.e. divide them)- use an arrangement to index the averages by block number
- use a custom
moving_avg
operator that takes in an arrangement and computes an average over the "greatest" 7200window_len
elements of in the arrangement (i.e. the averages for the 7200 most recent blocks)
// Map logs to (block_number, price) tuples, clustered by block number
collection batched_logs = @input("ETH_USDC_SWAP_LOGS")
.map(log => (log.block_number, log.price))
.batch_by_key()
collection volumes = batched_logs.sum_by_key()
collection counts = batched_logs.count_by_key()
// Arrange averages by block number
arrangement block_averages = volumes
.zip(counts)
.map(((block_num, volume), (_, count)) => (block_num, volume / count))
.arrange_by_key()
.consolidate()
export collection moving_avg_24hr = block_averages.moving_avg(7200)
export collection moving_avg_1hr = block_averages.moving_avg(300)
operator moving_avg<T: Div<T> + Add<T>>(window_len: usize) {
input arranged items: T
output single current_window = arranged_items
.iter_descending()
.take(window_len)
.map(block_avg => block_avg / window_len)
.sum()
}
The batch_by_key
operator tells the compiler that the keys
produced by the map
function will be clustered in the stream underlying the collection. This allows it to choose a more efficient implementation for subsequent operators. In this case, all logs from the block will appear adjacent to each other in the stream since they're in the same block. The naive sum_by_key
version will build an arrangement (expensive) - the batch_by_key
version will use an iteration context instead, which is much cheaper here.
For the arrangement
, the user needs to arrange the averages by block number. They can use the built-in arrange_by_key
operator for that. Internally, it builds a merkle tree (not a multiset hash) of the deltas and orders them by key, allowing efficient "random access" over historical updates through membership proofs. Then we use the consolidate
operator to tell the dataflow to always consolidate
deltas for the same keys. This arrangement can be re-used by any number of operators - to demonstrate this, the user also computes a 1 hour (~300 block) moving average using the same arrangement.
Then they can define a custom moving_avg
operator by dropping down to the lower-level differential
interface, which requires them to specify three things: input
declarations, output
declarations, and the logic itself. The logic is expressed the same way you'd define the program itself - conceptually this can be thought of a sort of "sub-dataflow". In the moving_avg
operator, the user specifies that the items
input is an arranged collection using the arranged
keyword. Arranged collections allow the operator to perform key-value lookups and iterate over the collection's values. The single
keyword on the output declaration signifies that the output is a collection that should always contain a single element. This is syntactic sugar for saying that each input delta should result in "removing the old window and adding the new window". The operator also uses a generic type parameter, signifying that it can build windows over any arranged collection of elements of any type T
such that division and addition are well-defined for T
.
Compiler Optimizations
The Jutsu compiler could be smart enough to consolidate operators expressed semantically into significantly more efficient concrete implementations, given what information it has about them. For instance, in the implementation for moving_avg
above, instead of writing
.fold(moving_avg, (moving_avg, block_avg) => moving_avg + block_avg / window_len)
we write
.map(block_avg => block_avg / window_len)
.sum()
While the programmer logically expressed more operators, the compiler can assume addition is commutative, so it can produce a differential operator that exploits this fact, avoiding the need to iterate altogether (subtract the quotient for the previous window's oldest value and add the quotient for the new one). In contrast, the compiler can't assume the function given to fold
is commutative.