This document covers the fundamental operation of basic (non-nested) transmissions in the Transmissions framework.
A basic transmission is a linear sequence of processors connected together, where data flows from one processor to the next. Each processor transforms the message and passes it to the next processor in the chain.
The TransmissionBuilder
is responsible for constructing transmission objects from RDF configuration data:
trn:Transmission
The Transmission
class manages the execution of a processor pipeline:
Connector
objects defining the pipeline flowIndividual processing units that transform messages:
ProcessorImpl
which provides event emission capabilitiesprocess()
method for message transformationLinks processors together in the pipeline:
Basic transmissions are defined in RDF/Turtle format:
@prefix trn: <http://purl.org/stuff/transmissions/> .
:myTransmission a trn:Transmission ;
trn:pipe (:processorA :processorB :processorC) .
:processorA a :ProcessorTypeA .
:processorB a :ProcessorTypeB .
:processorC a :ProcessorTypeC .
TransmissionBuilder.buildTransmissions()
├── For each trn:Transmission in dataset:
│ ├── constructTransmission(transmissionID)
│ │ ├── Create new Transmission instance
│ │ ├── Extract pipe nodes using GrapoiHelpers.listToArray()
│ │ ├── createNodes(transmission, pipenodes)
│ │ │ └── For each node:
│ │ │ ├── Check if node type equals trn:Transmission (false for basic)
│ │ │ ├── createProcessor(processorType)
│ │ │ └── transmission.register(nodeID, processor)
│ │ └── connectNodes(transmission, pipenodes)
│ │ └── For each adjacent pair:
│ │ └── transmission.connect(leftNode, rightNode)
│ └── Set transmission.app reference
└── Return array of built transmissions
Transmission.process(message)
├── Identify first processor (connectors[0].fromName || first key)
├── Get processor instance: transmission.get(processorName)
├── If connectors exist (pipeline mode):
│ ├── Follow connector chain to find last processor
│ ├── Set up event listener on last processor for final result
│ ├── Start pipeline: firstProcessor.receive(message)
│ └── Return Promise that resolves with final message
└── Else (single processor):
└── Return processor.receive(message)
processor1.receive(message)
├── ProcessorImpl.processMessage()
├── processor1.process(transformedMessage)
├── processor1.emit('message', result)
└── Triggers connector to processor2
├── processor2.receive(result)
├── processor2.process(result)
├── processor2.emit('message', result2)
└── Continue until last processor
transmission.register(processorName, instance)
transmission.processors[processorName]
// In Connector.connect()
fromProcessor.on('message', async (message) => {
logger.log(`|-> ${ns.shortName(toProcessor.id)} a ${toProcessor.constructor.name}`)
await toProcessor.receive(message)
})
transmissionStack
property tracks the execution pathConfiguration:
:simpleTest a trn:Transmission ;
trn:pipe (:nop1 :nop2 :nop3) .
:nop1 a :NOP .
:nop2 a :NOP .
:nop3 a :NOP .
Execution:
nop1.receive(message)
nop1
processes and emits resultnop2.receive(result)
nop2
processes and emits resultnop3.receive(result)
nop3
processes and emits final resultUse verbose mode to trace execution:
./trans -v myApp
Key log indicators:
+ ***** Construct Transmission
: Transmission being builtCreating processor
: Individual processor instantiation> Connect
: Pipeline connections being established|-> processorName a ProcessorType
: Message flow between processors