Basic Pipeline Operations

This document covers the fundamental operation of basic (non-nested) transmissions in the Transmissions framework.

Overview

A basic transmission is a linear sequence of processors connected together, where data flows from one processor to the next. Each processor transforms the message and passes it to the next processor in the chain.

Architecture Components

TransmissionBuilder

The TransmissionBuilder is responsible for constructing transmission objects from RDF configuration data:

Transmission

The Transmission class manages the execution of a processor pipeline:

Processor

Individual processing units that transform messages:

Connector

Links processors together in the pipeline:

Configuration Format

Basic transmissions are defined in RDF/Turtle format:

@prefix trn: <http://purl.org/stuff/transmissions/> .

:myTransmission a trn:Transmission ;
    trn:pipe (:processorA :processorB :processorC) .

:processorA a :ProcessorTypeA .
:processorB a :ProcessorTypeB .
:processorC a :ProcessorTypeC .

Execution Flow

1. Construction Phase

TransmissionBuilder.buildTransmissions()
├── For each trn:Transmission in dataset:
│   ├── constructTransmission(transmissionID)
│   │   ├── Create new Transmission instance
│   │   ├── Extract pipe nodes using GrapoiHelpers.listToArray()
│   │   ├── createNodes(transmission, pipenodes)
│   │   │   └── For each node:
│   │   │       ├── Check if node type equals trn:Transmission (false for basic)
│   │   │       ├── createProcessor(processorType)
│   │   │       └── transmission.register(nodeID, processor)
│   │   └── connectNodes(transmission, pipenodes)
│   │       └── For each adjacent pair:
│   │           └── transmission.connect(leftNode, rightNode)
│   └── Set transmission.app reference
└── Return array of built transmissions

2. Runtime Execution

Transmission.process(message)
├── Identify first processor (connectors[0].fromName || first key)
├── Get processor instance: transmission.get(processorName)
├── If connectors exist (pipeline mode):
│   ├── Follow connector chain to find last processor
│   ├── Set up event listener on last processor for final result
│   ├── Start pipeline: firstProcessor.receive(message)
│   └── Return Promise that resolves with final message
└── Else (single processor):
    └── Return processor.receive(message)

3. Message Flow

processor1.receive(message)
├── ProcessorImpl.processMessage()
├── processor1.process(transformedMessage)
├── processor1.emit('message', result)
└── Triggers connector to processor2
    ├── processor2.receive(result)
    ├── processor2.process(result)
    ├── processor2.emit('message', result2)
    └── Continue until last processor

Key Implementation Details

Processor Registration

transmission.register(processorName, instance)

Connection Setup

// In Connector.connect()
fromProcessor.on('message', async (message) => {
    logger.log(`|-> ${ns.shortName(toProcessor.id)} a ${toProcessor.constructor.name}`)
    await toProcessor.receive(message)
})

Error Handling

Example: Simple NOP Pipeline

Configuration:

:simpleTest a trn:Transmission ;
    trn:pipe (:nop1 :nop2 :nop3) .

:nop1 a :NOP .
:nop2 a :NOP .
:nop3 a :NOP .

Execution:

  1. Message sent to nop1.receive(message)
  2. nop1 processes and emits result
  3. Connector forwards to nop2.receive(result)
  4. nop2 processes and emits result
  5. Connector forwards to nop3.receive(result)
  6. nop3 processes and emits final result
  7. Promise resolves with final result

Performance Considerations

Debugging

Use verbose mode to trace execution:

./trans -v myApp

Key log indicators: