This document covers the operation of nested transmissions in the Transmissions framework, where transmissions can contain other transmissions as processors.
Nested transmissions allow complex, hierarchical data processing workflows where entire transmission pipelines can be embedded as single nodes within parent transmissions. This enables modular, reusable pipeline components and sophisticated branching/merging patterns.
The TransmissionBuilder
supports recursive transmission construction:
trn:Transmission
constructTransmission()
recursively for nested transmissionsMAX_NESTING_DEPTH = 10
transmissionCache
to avoid rebuilding the same transmission multiple timesNested transmissions form parent-child relationships:
The Connector
class handles three connection scenarios:
New methods for nested transmission operations:
Nested transmissions use the same RDF format but include transmission references:
@prefix trn: <http://purl.org/stuff/transmissions/> .
# Main transmission with nested sub-transmissions
:mainPipeline a trn:Transmission ;
trn:pipe (:subPipeA :subPipeB :subPipeC) .
# Nested transmission definitions
:subPipeA a trn:Transmission ;
trn:pipe (:s1 :s2 :s3) .
:subPipeB a trn:Transmission ;
trn:pipe (:s3 :s104 :s105) .
:subPipeC a trn:Transmission ;
trn:pipe (:s3 :s204 :s205) .
# Individual processors
:s1 a :NOP .
:s2 a :NOP .
:s3 a :NOP .
:s104 a :NOP .
:s105 a :NOP .
:s204 a :NOP .
:s205 a :ShowTransmission .
TransmissionBuilder.buildTransmissions()
├── For each top-level trn:Transmission:
│ └── constructTransmission(transmissionID)
│ ├── Create Transmission instance
│ ├── Extract pipe nodes
│ ├── createNodes(transmission, pipenodes)
│ │ └── For each node:
│ │ ├── Check: processorType.equals(ns.trn.Transmission)
│ │ ├── If TRUE (nested transmission):
│ │ │ ├── nestedTx = constructTransmission(node) [RECURSIVE]
│ │ │ ├── nestedTx.parent = currentTransmission
│ │ │ ├── nestedTx.path = [...parent.path, nodeName]
│ │ │ └── transmission.register(nodeID, nestedTx)
│ │ └── If FALSE: create regular processor
│ └── connectNodes() - handles mixed processor/transmission connections
└── Return transmission hierarchy
MainTransmission.process(message)
├── Find first processor: "subPipeA" (a Transmission)
├── Setup Promise with nested transmission handling:
│ ├── Follow connector chain to find last processor
│ ├── If lastProcessor instanceof Transmission:
│ │ └── actualLastProcessor = lastProcessor.getLastNode()
│ ├── Setup listener: actualLastProcessor.on('message', resolve)
│ ├── If firstProcessor instanceof Transmission:
│ │ └── actualFirstProcessor = firstProcessor.getFirstNode()
│ └── Start: actualFirstProcessor.receive(message)
└── Message flows through nested hierarchy
message → MainTransmission
├── pipeA (Transmission)
│ ├── s1.receive(message)
│ ├── s1 → s2 → s3
│ └── s3.emit('message', result1)
├── Connector: pipeA.lastNode → pipeB.firstNode
├── pipeB (Transmission)
│ ├── s3.receive(result1)
│ ├── s3 → s104 → s105
│ └── s105.emit('message', result2)
├── Connector: pipeB.lastNode → pipeC.firstNode
└── pipeC (Transmission)
├── s3.receive(result2)
├── s3 → s204 → s205
└── s205.emit('message', finalResult)
// In TransmissionBuilder.createNodes()
const isTransmissionReference = processorType.equals(ns.trn.Transmission)
if (isTransmissionReference) {
const nestedTransmission = await this.constructTransmission(
transmissionsDataset,
node, // Critical: pass node (transmission ID), not processorType
configDataset
)
transmission.register(node.value, nestedTransmission)
}
// In Transmission.register()
if (processor instanceof Transmission) {
processor.parent = this
processor.path = [...this.path, processorName]
this.children.add(processor)
}
// Get first processor in transmission
getFirstNode() {
const processorName = this.connectors[0]?.fromName || Object.keys(this.processors)[0]
return this.get(processorName)
}
// Get last processor in transmission
getLastNode() {
const processorNames = Object.keys(this.processors)
for (const name of processorNames) {
const isSource = this.connectors.some(c => c.fromName === name)
if (!isSource) {
return this.get(name)
}
}
// Fallback to last processor
const lastProcessorName = processorNames[processorNames.length - 1]
return this.get(lastProcessorName)
}
// In Connector.connect()
if (fromProcessor instanceof Transmission && toProcessor instanceof Transmission) {
// Transmission → Transmission
const lastNode = fromProcessor.getLastNode()
const firstNode = toProcessor.getFirstNode()
lastNode.on('message', async (message) => {
await firstNode.receive(message)
})
} else if (fromProcessor instanceof Transmission) {
// Transmission → Processor
const lastNode = fromProcessor.getLastNode()
lastNode.on('message', async (message) => {
await toProcessor.receive(message)
})
} else if (toProcessor instanceof Transmission) {
// Processor → Transmission
fromProcessor.on('message', async (message) => {
const firstNode = toProcessor.getFirstNode()
await firstNode.receive(message)
})
}
:workflow a trn:Transmission ;
trn:pipe (:dataExtraction :dataTransformation :dataOutput) .
Each step is a complete transmission with its own internal pipeline.
:parallelProcessor a trn:Transmission ;
trn:pipe (:input :branchA :branchB :branchC :aggregator) .
Where branchA, branchB, branchC are parallel transmission branches.
:conditionalFlow a trn:Transmission ;
trn:pipe (:router :pathA :pathB :merger) .
Router determines which nested transmission path to follow.
// Error propagation includes nesting information
error.transmissionStack = error.transmissionStack || []
error.transmissionStack.push(this.id)
if (++this.currentDepth > this.MAX_NESTING_DEPTH) {
throw new Error(`Maximum transmission nesting depth of ${this.MAX_NESTING_DEPTH} exceeded`)
}
Verbose mode shows nested construction:
+ ***** Construct Transmission : <main-transmission>
*** Constructing nested transmission for node: pipeA
+ ***** Construct Transmission : <pipeA>
Creating processor: s1 Type: NOP
*** Registering nested transmission: pipeA -> pipeA
Example test execution:
./trans -v multi-pipe
Expected flow indicators:
*** Constructing nested transmission
: Recursive build*** Registering nested transmission
: Hierarchy establishment|-> pipeA a Transmission
: Nested transmission execution[s1.s2.s3.s3.s104]
indicates path through hierarchy