Skip to content

Function Piping

pipe

Most functions return in the format of { err, res, out, pid, mid, id }, and these function can be chained with pipe, which makes executing multiple messages a breeze.

For example, the following is how deploy uses pipe internally. The execution will be immediately aborted if any of the functions in fns produces an error.

let fns = [
  {
    fn: "spwn",
    args: { module, scheduler, tags, data },
    then: { "args.pid": "pid" },
   },
   { fn: "wait", then: { "args.pid": "pid" } },
   { fn: "load", args: { src, fills }, then: { "args.pid": "pid" } }
]
const { err, res, out, pid } = await this.pipe({ jwk, fns })

bind

If the function comes from other instances rather than AO, use bind.

const fns = [{ fn: "post", bind: this.ar, args: { data, tags }}]

then

You can pass values between functions with then. For instance, passing the result from the previous functions to the next function's arguments is a common operation.

const fns = [
  { fn: "post", bind: ao.ar, args: { data, tags }, then: ({ id, args, out })=>{
    args.tags.TxId = id // adding TxId tag to `msg` args
	out.txid = id // `out` will be returned at last with `pipe`
  }},
  { fn: "msg", args: { pid, tags }},
]
const { out: { txid } } = await ao.pipe({ fns, jwk })

If then returns a value, pipe will immediately return with that single value. You can also use err to abort pipe with an error.

const fns = [
  { fn: "msg", args: { pid, tags }, then: ({ inp })=>{
     if(inp.done) return inp.val
  }},
  { fn: "msg", args: { pid, tags }, err: ({ inp })=>{
     if(!inp.done) return "something went wrong"
  }},
]
const val = await ao.pipe({ jwk, fns })

then has many useful parameters.

  • res : res from the previous result
  • args : args for the next function
  • out : the final out result from the pipe sequence
  • inp : out from the previous result
  • _ : if values are assigned to the _ fields, pipe returns them as top-level fields in the end
  • pid : pid will be passed if any previous functions return pid ( e.g. deploy )
  • mid : mid will be passed if any previous functions return mid ( e.g. msg )
  • id : id will be passed if any previous functions return id ( e.g. post )

then can be a simplified hashmap object.

let fns = [
  {
    fn: "msg",
    args: { tags },
    then: { "args.mid": "mid", "out.key": "inp.a", "_.val": "inp.b" },
   }, 
   { fn: "some_func", args: {} } // args.mid will be set from the previous `then`
]
const { out: { key }, val } = await ao.pipe({ jwk, fns })

err

err has the same signature as then. If err returns a value, pipe will throw an Error with that value.

const fns = [
  { fn: "msg", args: { pid, tags }, err: ({ inp })=>{
     if(!inp.done) return "something went wrong!"
  }}
]
const val = await ao.pipe({ jwk, fns })

cb

cb can report the current progress of pipe after every function execution.

await ao.pipe({ jwk, fns, cb: ({ i, fns, inp })=>{
  console.log(`${i} / ${fns.length} functions executed`)
}})