This document covers the fundamental operation of basic (non-nested) transmissions in the Transmissions framework.
A basic transmission is a linear sequence of processors connected together, where data flows from one processor to the next. Each processor transforms the message and passes it to the next processor in the chain.
The TransmissionBuilder is responsible for constructing transmission objects from RDF configuration data:
trn:TransmissionThe Transmission class manages the execution of a processor pipeline:
Connector objects defining the pipeline flowIndividual processing units that transform messages:
ProcessorImpl which provides event emission capabilitiesprocess() method for message transformationLinks processors together in the pipeline:
Basic transmissions are defined in RDF/Turtle format:
@prefix trn: <http://purl.org/stuff/transmissions/> .
:myTransmission a trn:Transmission ;
trn:pipe (:processorA :processorB :processorC) .
:processorA a :ProcessorTypeA .
:processorB a :ProcessorTypeB .
:processorC a :ProcessorTypeC .
TransmissionBuilder.buildTransmissions()
├── For each trn:Transmission in dataset:
│ ├── constructTransmission(transmissionID)
│ │ ├── Create new Transmission instance
│ │ ├── Extract pipe nodes using GrapoiHelpers.listToArray()
│ │ ├── createNodes(transmission, pipenodes)
│ │ │ └── For each node:
│ │ │ ├── Check if node type equals trn:Transmission (false for basic)
│ │ │ ├── createProcessor(processorType)
│ │ │ └── transmission.register(nodeID, processor)
│ │ └── connectNodes(transmission, pipenodes)
│ │ └── For each adjacent pair:
│ │ └── transmission.connect(leftNode, rightNode)
│ └── Set transmission.app reference
└── Return array of built transmissions
Transmission.process(message)
├── Identify first processor (connectors[0].fromName || first key)
├── Get processor instance: transmission.get(processorName)
├── If connectors exist (pipeline mode):
│ ├── Follow connector chain to find last processor
│ ├── Set up event listener on last processor for final result
│ ├── Start pipeline: firstProcessor.receive(message)
│ └── Return Promise that resolves with final message
└── Else (single processor):
└── Return processor.receive(message)
processor1.receive(message)
├── ProcessorImpl.processMessage()
├── processor1.process(transformedMessage)
├── processor1.emit('message', result)
└── Triggers connector to processor2
├── processor2.receive(result)
├── processor2.process(result)
├── processor2.emit('message', result2)
└── Continue until last processor
transmission.register(processorName, instance)
transmission.processors[processorName]// In Connector.connect()
fromProcessor.on('message', async (message) => {
logger.log(`|-> ${ns.shortName(toProcessor.id)} a ${toProcessor.constructor.name}`)
await toProcessor.receive(message)
})
transmissionStack property tracks the execution pathConfiguration:
:simpleTest a trn:Transmission ;
trn:pipe (:nop1 :nop2 :nop3) .
:nop1 a :NOP .
:nop2 a :NOP .
:nop3 a :NOP .
Execution:
nop1.receive(message)nop1 processes and emits resultnop2.receive(result)nop2 processes and emits resultnop3.receive(result)nop3 processes and emits final resultUse verbose mode to trace execution:
./trans -v myApp
Key log indicators:
+ ***** Construct Transmission: Transmission being builtCreating processor: Individual processor instantiation> Connect: Pipeline connections being established|-> processorName a ProcessorType: Message flow between processors