A Better Tomorrow, Today: A 2022 Pipeline Fanfic

The distant future: the year 2022. Usual fanfic rules apply.

R1 hosts three kinds of public resources: collections, schemas, and pipelines. Neither schemas nor pipelines are “behind” or “tied to” a specific collection; all three are their own top-level conceptual resource.


One thing that needs clarification right away is the difference between an “APG schema” and a “tasl schema”. An APG schema is just an instance of the schema schema; it’s a non-human-readable binary blob. You can display it in e.g. a graph visualization, but it has no corresponding text format. tasl is a text schema language; a tasl schema compiles down to an APG schema. tasl has nice things like comments and abbreviations (and maybe imports in the future). In general you can’t reverse-engineer useful tasl text from an APG schema.

For now, we use the file extensions .tasl and .schema for tasl and APG schemas, respectively.

“APG” vs “tasl” is not necessarily a great naming scheme and I’ve been considering merging those conceptual namespaces and just branding everything “tasl”, but that’s a separate discussion.


Okay, with that out of the way, what are these three kinds of resources?

Collections

A collection is a) an APG schema and b) zero or more instances of that schema. We conceptually represent collections as folders with this structure:

/
/collection.schema
/instances/
/instances/a.instance
/instances/b.instance
/instances/...

This might physically take the form of an actual directory on a filesystem, a gzipped tarball, an IPFS hash, etc. The .schema and .instance files are both binary APG serializations.

(This is so minimal! You might notice that there’s no collection.json and no migrations and no history; we probably eventually want some or all of these in the collection folder but we’re going to ignore that for now so that we can focus on other things.)

Collections are hosted at URLs, identical to the concept of Git remotes. Collections are versioned automatically with major.minor.patch version numbers. We are currently living in the “Zero Era”, where every collection has major version 0, and new versions increment the minor number if they change the schema and increment the patch number if they don’t change the schema. The Zero Era will end when we introduce schema migrations or similar.

Collections are just places, as in “location”/“endpoint”/“venue”. On R1, there’s no “edit” tab on the collection page. A collection is just a place to push things to and a place to pull things from; all the collection does is accumulate its version history.

Pipelines

So what is doing the pushing and pulling? Pipelines. Pipelines are similar to build tools like Make. However unlike Make, pipelines don’t consist of arbitrary code doing arbitrary things - a pipeline is a program in a strongly typed dataflow environment. Pipelines are composed of blocks wired up together; each block can perform arbitrary side effects but must ultimately satisfy a strict external interface.

On R1 you can go to your profile and go to a list of your pipelines and click “edit pipeline” and it opens a pipeline editor. You can drag blocks around, wire them up to each other, and configure the state value for each block (more on this later).

The pipeline editor looks like something in this vein.

On the top of each pipeline page is a big red “EXECUTE PIPELINE” button. When you click the button, it executes the pipeline. You can click the button whenever you want, or maybe there are complex R1-enforced rules about who can click the button, what checklist they need to fill out beforehand, and who needs to sign off on what.

R1 gives users a “catalog” of blocks; each block has a fixed number of named “inputs” and a fixed number of named “outputs”. Wires connect outputs to inputs. An output can be the source of multiple wires, but an input can only be the target of a single wire. You should think of wires as “conducting” APG (schema, instance) pairs that flow through them, although in practice the schemas and instances are produced in into two different stages. A block’s job is to receive a these pairs on its inputs and send these pairs out along its outputs. Execution is done in big discrete steps (ie not incrementally on partial inputs or several inputs at once).

The external interface for a block has four parts: a state type (and initial value), an input codec map, a validate function, and an evaluate function. Here we have to take another detour to explain “codecs”.


“Codec” is usually used in the context of signals or streaming, but it’s also a common generic term for the components of runtime type validators. I’ve started using “codec” as a name for “predicates over APG schemas”, or more intuitively, “schema constraints”.

Basically a codec defines a subclass of schemas. This is very useful to do, since APG schemas are so powerful. One example is “relational schemas”. Traditional relational database schemas are equivalent to the subclass of APG schemas that satisfy the constraints

  • Every class is a product type
  • The component types of the product type are either
    • references (foreign keys), literals, URIs, or
    • a coproduct with two options:
      • ul:none with a unit type, and
      • ul:some with a reference, literal, or URI type.

A relational codec is a function (schema: Schema) => schema is RelationalSchema that takes an arbitrary APG schema and checks whether it satisfies those constraints. (here we’re pretending that RelationalSchema is the typescript type of a relational schema, and the codec is typed as a custom type guard - in general codecs-as-predicates are typed as (schema: Schema) => schema is X for some X extends Schema).

Codecs can be confusing to think about because they introduce this second layer of “types” - schemas themselves are types that structure data, but now we’re also talking about these codecs that structure the schemas!. One key difference that is important to remember is that a schema is “perfectly specific” - it describes exactly the shape of all the data in an instance - while a codec is just a constraint that restricts you to a subclass of possible schemas.

Some trivial kinds of codecs include:

  • “any schema”
  • “is exactly equal to x schema”
  • “is assignable to x schema” (ie { foo: number; bar: string } is assignable to { foo: number }).

… although most useful codecs have more complex constraints.


Okay great! We said that a block has four parts:

  • a state type & initial value
  • an input codec map
  • a validate function
  • an evaluate function

The “state” of a block is its configuration data - every abstract block in the catalog declares the shape of its configuration data, and then whenever that block is used in a pipeline, each instance of the block is parametrized by a value of that type. This state value is what users can edit in the pipeline editor (along with editing the general graph structure of the pipeline itself), and it’s saved as part of the pipeline graph. The state of each block is supposed to be very small, and should never be “raw data” like a whole CSV or anything. If blocks need to interact with larger resources like CSVs the state should just hold URLs that point to them (hosted elsewhere).

The “input codec map” just means that each block statically defines a codec for each of its named inputs. For example, a block definition might say "I can take in data from a relational schema on input foo, and data from any kind of schema on input bar, and RDF data on input baz". It would do this by supplying the appropriate codecs - one of which checks to see if a schema satisfies the relation constraints, one of which validates every schema, and one of which e.g. only validates a schema if it is exactly equivalent to some canonical “RDF schema. Codecs are powerful!

Then there are two functions: validate and evaluate.

// Schema and Instance are from the APG library.
// Instance is generic, and takes a <S extends Schema> parameter.

interface Block<
	State,
	Inputs extends Record<string, Schema>,
	Outputs extends Record<string, Schema>
> {
	validate(state: State, inputSchemas: Inputs): Outputs { /* ... */ }
	async evaluate(
		state: State,
		inputSchemas: Inputs,
		inputInstances: {[ i in keyof Inputs]: Instance<Inputs[i]> }
	): Promise<{[ o in keyof Outputs]: Instance<Outputs[o]> }> { /* ... */ }
}

(validate could also be called evaluateSchemas, and evaluate could also be called evaluateInstances. Maybe this would make more sense.)

The idea is that validate “executes” the schema-level (aka type-level) and returns a schema for every output, and evaluate executes the instance-level (aka value-level) and returns an instance for every output. For each output, the instance returned from evaluate must match the schema returned from validate.

If we considering the pipeline as a graph of nodes (block instances) and edges (wires from outputs to inputs), validate pipes schemas through the edges (each edge gets set to a concrete, specific schema), and then evaluate pipes instances through the edges (each edges get set to a concrete, specific instance of its corresponding schema).

We could have combined these into a single function with a signature like this:

interface Block<
	State,
	Inputs extends Record<string, Schema>,
	Outputs extends Record<string, Schema>
> {
	async execute(
		state: State,
		inputSchemas: Inputs,
		inputInstances: {[ i in keyof Inputs]: Instance<Inputs[i]> }
	): Promise<{
		[ o in keyof Outputs]: {
			schema: Output[o],
			instance: Instance<Output[o]>
		}
	}> { /* ... */ }
}

… however separating them is good because it lets us do typechecking for the entire pipeline - computing the schemas of every output and then checking to see if they validate the input codecs of the blocks they’re connected to - separately from actually trying to execute the whole thing, which might be really expensive. We want to be able to run validate over and over in the background, and we also want to protect evaluate with those rules and checklists and sign-offs mentioned earlier.

Okay! We’re doing great. R1 hosts a catalog of these blocks, which users can wire together (and configure!) using a dataflow editor environment.

So far we’ve described blocks as if they have both inputs and outputs, but the most important blocks are actually ones that have no inputs (sources) or have no outputs (sinks). Plus, since pipeline graphs are acyclic, every graph has to include least one source and has to include at least one sink.

Sources are blocks like “CSV import”, which are configured by e.g. a URL to a CSV and a header-to-datatype map. The CSV import block has no inputs, and has just one output. The validate function looks at the header-to-datatype map and uses it to assemble a schema; the evaluate function fetches the URL and returns an instance of that schema.

Sinks are blocks like “CSV export”, which might also be configured a URL and a header mapping, except this time it takes one input in and yields no outputs. the validate function doesn’t do anything, and the evaluate function uses the input instance and header mapping to serialize a CSV file, and then POSTs it to the URL (which is maybe then made available for download to the user). Here, we would also have to supply a “CSV schema codec” that checks whether a given input schema is serializable as a CSV or not (in this case, this would be a restriction of the relational codec that excludes foreign keys).

Another kind of source block would be “remote collection source”, where the block is configured to reference a version of a different collection (hosted on R1 or elsewhere).

And - most importantly - another kind of sink block would be “publish collection”, which is configured to point to a collection URL, takes in one input, and publishes its single input as a new version of the reference collection every time the pipeline is executed.

A pipeline might have many collection sources or no collection sources, and similarly it might publish to many collection targets or not publish to any collection targets at all. Pipelines are just the language and environment that R1 gives its users to “do things with data”; each pipeline is a recipe for doing a certain thing. Collections are the places the you stash the data between pipelines; they’re the storage layer of pipeline programming.

Schemas

One thing you may have noticed is that the entire pipeline & collection story so far doesn’t mention tasl schemas at all! When you import a CSV with the CSV import block, the block internally computes an APG schema from the header map configuration - the users don’t write the schema themselves. The entire pipeline environment only deals with APG schemas and instances, so… what do people write tasl schemas for?

I’m beginning to realize that the appropriate role for tasl is less prominent than I previously thought. Not every collection is going to have a tasl schema; in fact maybe most won’t.

tasl schemas are for specifications. When a group wants to collaborate on a shared spec - one that many different people will be publishing data under - then they write a tasl schema to make that spec explicit.

(We might even consider just straight up calling them “specifications”)

There are a few different ways I could see these integrating with the rest of R1:

  • Universe A

    We let some of the blocks - like the “publish collection” block - optionally reference an R1 tasl schema. If the input schema isn’t exactly equal to the reference R1 tasl schema version, validate fails. This enforces within a pipeline that only instances of the spec will get published to the target collection.

    (This is a little weird because now it means there are two kinds of typechecking - validating each input schema w/r/t its codec individually, and also now the validate function which we expect to possibly fail. This is might just mean the we should forget the whole idea of separate “codecs” and have validate do any and all validation itself, instead of running the codecs beforehand. This is something I’ll think about more…)

  • Universe B

    Within the pipeline editor, any edge can be pinned to a R1 tasl schema version, and validating the pipeline will fail if the schema on that edge isn’t exactly equal to the spec.

    This accomplishes the same thing as Universe A - enforcing within a pipeline that only instances of a spec are allowed in certain places - but does it in a more general way (at the expense of complicating the pipeline’s graph data model).

  • Universe C

    We let collections on R1 optionally point to a R1 tasl schema version, and attempts to push collection versions that don’t exactly match it fail. The fact that the collection points to a tasl schema is just a R1-world concept. This universe is not necessarily mutually exclusive with A or B.

In all cases, referencing a spec would also make the pipeline editor aware of the “desired” schema for certain edges, which it could theoretically use to help you autocomplete your CSV header mapping, etc.


I’ve totally ignored some really important things, like identity/permalinking and metadata/provenance. This fanfic is just focused on day-to-day usage in the context of R1 as a “data lifecycle product”. I’m hoping that we can just discuss how well this vision would serve these everyday needs, and then - if it does - we can start from here and consider those bigger issues.

I’m also aware that this phases in and out of Underlay and R1 world, which is exactly what we said we’d shoot for more separation between. It all felt right though - consider this an “integration proposal” if anything.

I could literally feel myself getting less and less coherent as I wrote this so I’m sure some things aren’t very clear. :slight_smile:

Also some parts of this had a more technically explanatory mood, but don’t let those distract you from the fundamental hypotheticalness of being a fanfic!

Great to see this.

Since this isn’t the only way to encode apg schemas, “tasl” seems like a clean branding for the whole. How about tasl binary and tasl text? Is there a reason not to have a schema-level description field associated with a .schema blob?

This feels clear, and suggests that in the future there may be other facets to evaluate (such as specifications)

referencing a spec would also make the pipeline editor aware of the “desired” schema for certain edges, which it could theoretically use to help you autocomplete your CSV header mapping

+1.

all the collection does is accumulate its version history.

Anything that accumulates a version history can have an “edit” link to create a new version in context. That seems like a good thing for R1 to have. Options for editing could include rerun last pipeline, attach new pipeline, or edit pipeline, taking users to the appropriate interface for each. I have a thought about non-executable steps, perhaps better suited for its own comment.

a “catalog” of blocks

This reminds me of the LinkedPipes catalog of components + “circuit view”, which I didn’t see in your pinboard of visual inspirations. As we run across others an index of such catalogs would be useful.

We let collections on R1 optionally point to a R1 tasl schema version, and attempts to push collection versions that don’t exactly match it fail.

Some interface that allows both specifications and collections to be updated independently of validation may be useful. You may have external grounds to update each, and it is not always obvious which needs to change to make them compatible. Being able to update your “best estimate spec” and “most complete collection” in parallel to wrestling with making the spec validate, is useful.

Specs often have hierarchies, and you don’t have to go far to find parent specs that do match a collection. A creative Refine-like tool might look over your collection and identify the smallest change to {spec, collection} needed to validate against eachother.

External pipeline blocks / less-executable steps

In general, data pipelines include steps that R1 won’t be able to replicate. Either they happen behind an opaque context, or they are implemented by a person or other agent running its own opaque analysis of how to transform an input to an output, or they are simply an invocation of a tool that isn’t hosted on R1. (Anything from the wide world of pipe-able toolchains that consume and produce CSVs).

So you could have a one-step R1 pipeline that says “proofread by Muriel” which requires Muriel to come look at the input and (modify +) approve the output. That could even happen in the browser via an R1-hosted interface, but can’t be executed by R1 every time someone other than Muriel presses a button. Or two pipelines A and B, connected by an OpenRefine pipeline. You might be able to write down a command-line description of the pipeline, and specify the version of OR, and could imagine adding this as an R1-native block; but in general I imagine this as something like an ‘external’ block requiring a person to affirm that the block was carried out.
In the case of a coypedit, you might have an external block for “manual edit” and the affirmation implied by clicking “save” after an edit interface.

How would this fit into the fanfic?

I suppose one cost of external blocks is the need for additional evaluation. [and one value of integration of additional processes into R1 is minimizing this overhead]
You could capture this as one ‘internal pipeline’ for each unbroken string of internal blocks, where a string of external blocks has an IP-output as one or more of its inputs, and an IP-input (to a separate internal pipeline) as one or more of its outputs Or perhaps capture the whole flow as one named pipeline, but archive some of its intermediate results for improved fallbacks if external blocks fail.

Here’s a related thought, still incomplete + perhaps suited to its own fanfic, but related to the previous comment: so I’m leaving it here for now.


On data lifecycles

Imagine that a universe similar to the one described above also has popular repositories of 4 different sorts of graphs describing executable information flows, each of which could be a subgraph of the one below it:

pipelines : flows between source + sink collections, with blocks from the above catalog.
c-lines : flows b/t source + sink datasets, that involve at least one collection along the way
d-lines : flows b/t any source + sink, that involve at least one dataset along the way
u-lines : any flow of information

  • Here pipelines here could be like those in the fanfic above, but their sources and sinks are collections, and they are fully executable.
  • c-lines describe information flows between datasets, that produce or read from some collection. They can include traditional data flows that generate collections.
  • d-lines describe information flows between any source and sink, that produce or read from some dataset. They can include physical observations and outputs, and non-executable steps like curation and review.
  • u-lines describe any information flows, current or imagined, including those that have not been recorded in any way.

Part of knowledge seeking and information processing is articulating one of these flows, instantiating it from available sources, and exploring ways to make it more concrete / repeatable / structured by moving up the list and creating new flows where they don’t yet exist.

Pipelines are not needed to have a catalog of collections: a universe of c-lines suffices to generate and read from collections, and populate that part of a registry. For instance, amazon, data.world, and GCP could publish collections describing their ~1000 open datasets; likewise Crossref could watch a feed of new collections and generate DOIs for them as they appear.

But any c-line that contains two successive collections could have the edge between them replaced by (or expanded into) a pipeline. Just as edges in a pipeline might reference a desired human-readable schema, edges in a c-line might reference a pipeline to identify a desired block-circuit.

Describing data lifecycles

Current data lifecycles can generally be described with a d-line. If a d-line contains two successive datasets, the edge between them could be a c-line. This set of concepts gives us a way to talk about how to get from that sort of lifecycle graph to versions of it that include collections, and pipelines connecting those collections when considering examples, while keeping the core concepts narrowly scoped.

In general, a d-line is not fully describable as a single pipeline – its line will leave the realm of collections at times, and the realm of datasets at others. But we can think about ways to maximize what proportion of links in a graph are pipelines, and to minimize the # of separate pipelines / excursions out of the pipe network, and when that is helpful / what that affords.

Connections between dataset nodes in a d-line diagram can be c-lines; connections between collection nodes in these can be pipelines. The inverse is not true: pipelines in this universe do not have a way to represent or address most c-line or d-line nodes or links.

Flow diagrams as resources

The graphs of all of these lines – the flow diagrams themselves – are useful to name, reference, and annotate. Nodes and flows are each top-level conceptual resources. It should be possible to look at a set of sources and sinks and find all flows that connect them. It should also be possible to look at the diff between two flows.

Conceptual flows, implemented by successive flow diagrams

Finally, u-lines are general enough to encompass one of the most common ways to diagram an information flow: an abstraction of sources, processes, and sinks that captures the desired outcome of a flow without specifying how it happens.

For instance “We will estimate the sun’s temperature from its light, using this scope to record images to this computer, and analyzing it with these scripts” is a well defined u-line. It could be realized with a wide range of different datasets + formats, observed over a variety of times, running the same scripts w/ different parameters. After choosing the datasets you want to use, any subset of them could be made into schemas + collections. And some subset of those collections could be piped to an output collection of temperature estimates.

Useful impacts of turning what could be a pen-and-paper exercise into a structured pipeline may include:

  • reuse: creating interim transformations, collections, and schemas that are reusable and broadly useful for a range of adjacent problems
  • replicability: creating a repeatable process that is inherently, even provably replicable
  • error checking: identifying the points in the overall process where observations and work are logged in an increasingly auditable and validatable way
    – creating a sequence of spot-checkable steps to allow for faster stochastic testing
    – moving edges up the ladder of specificity may reduce certain types of error or speed catching and debugging