This guide explains how to create custom processors to extend the Transmissions framework with new functionality.
Processors are the building blocks of Transmissions pipelines. Each processor:
Processor
classprocess(message)
methodProcessors are organized by category in src/processors/
:
// src/processors/text/MyProcessor.js
import logger from '../../utils/Logger.js'
import ns from '../../utils/ns.js'
import Processor from '../../model/Processor.js'
/**
* @class MyProcessor
* @extends Processor
* @classdesc
* **a Transmissions Processor**
*
* Brief description of what this processor does.
*
* ### Processor Signature
*
* #### __*Settings*__
* * **`ns.trn.settingName`** - Description of setting
*
* #### __*Input*__
* * **`message`** - The message object to process
*
* #### __*Output*__
* * **`message`** - The processed message object
*
* #### __*Behavior*__
* * Describe what the processor does
*
* #### __*Side Effects*__
* * List any side effects
*/
class MyProcessor extends Processor {
constructor(config) {
super(config)
}
async process(message) {
logger.debug(`MyProcessor.process starting`)
// Get configuration
const setting = this.getProperty(ns.trn.settingName, 'default-value')
// Process the message
// ... your logic here ...
// Emit result to next processor
return this.emit('message', message)
}
}
export default MyProcessor
Add your processor to the appropriate factory:
// src/processors/text/TextProcessorsFactory.js
import MyProcessor from './MyProcessor.js'
class TextProcessorsFactory {
static createProcessor(type, config) {
if (type.equals(ns.trn.MyProcessor)) {
return new MyProcessor(config)
}
// ... other processors
}
}
Register your category in the main processor factory:
// src/processors/ProcessorFactory.js
import TextProcessorsFactory from './text/TextProcessorsFactory.js'
class ProcessorFactory {
static createProcessor(type, config) {
// Try text processors
let processor = TextProcessorsFactory.createProcessor(type, config)
if (processor) return processor
// ... other factories
}
}
Use getProperty()
to read configuration:
// Required setting (throws if missing)
const filename = this.getProperty(ns.trn.filename)
// Optional with default
const encoding = this.getProperty(ns.trn.encoding, 'utf8')
// Check if setting exists
if (this.hasProperty(ns.trn.optionalSetting)) {
const value = this.getProperty(ns.trn.optionalSetting)
}
Settings can come from message properties or configuration. Message properties take priority:
// This will use message.filename if it exists, otherwise config setting
const filename = this.getProperty(ns.trn.filename, 'default.txt')
Handle complex settings:
// Array settings
const items = this.getProperty(ns.trn.items, [])
// Object settings
const options = this.getProperty(ns.trn.options, {})
// Nested properties
const database = {
host: this.getProperty(ns.trn.dbHost, 'localhost'),
port: this.getProperty(ns.trn.dbPort, 5432),
name: this.getProperty(ns.trn.dbName)
}
Transform message content:
async process(message) {
const input = message.content || ''
const result = input.toUpperCase()
message.content = result
return this.emit('message', message)
}
Add properties to messages:
async process(message) {
const fieldName = this.getProperty(ns.trn.field)
const fieldValue = this.getProperty(ns.trn.value)
message[fieldName] = fieldValue
return this.emit('message', message)
}
Skip or modify based on conditions:
async process(message) {
// Skip if message is marked as done
if (message.done) {
logger.debug('Message marked as done, skipping')
return this.emit('message', message)
}
// Process normally
// ... processing logic ...
return this.emit('message', message)
}
Handle asynchronous operations:
async process(message) {
try {
const url = this.getProperty(ns.trn.url)
const response = await fetch(url)
const data = await response.json()
message.apiResponse = data
return this.emit('message', message)
} catch (error) {
logger.error(`API request failed: ${error.message}`)
message.error = error.message
return this.emit('message', message)
}
}
Handle errors without breaking the pipeline:
async process(message) {
try {
// Risky operation
const result = await this.riskyOperation(message)
message.result = result
} catch (error) {
logger.warn(`Operation failed: ${error.message}`)
message.error = error.message
// Continue processing despite error
}
return this.emit('message', message)
}
Validate inputs early:
async process(message) {
const required = this.getProperty(ns.trn.requiredField)
if (!required) {
throw new Error('Required field is missing')
}
// Process with validated input
// ...
}
Manage external resources:
class DatabaseProcessor extends Processor {
constructor(config) {
super(config)
this.connection = null
}
async initialize() {
const host = this.getProperty(ns.trn.dbHost)
this.connection = await connectToDatabase(host)
}
async process(message) {
if (!this.connection) {
await this.initialize()
}
const result = await this.connection.query(message.query)
message.result = result
return this.emit('message', message)
}
async cleanup() {
if (this.connection) {
await this.connection.close()
this.connection = null
}
}
}
Maintain state across messages:
class AccumulatorProcessor extends Processor {
constructor(config) {
super(config)
this.accumulated = []
}
async process(message) {
// Add to accumulator
this.accumulated.push(message.data)
// Check if we should emit
const threshold = this.getProperty(ns.trn.threshold, 10)
if (this.accumulated.length >= threshold) {
message.batch = [...this.accumulated]
this.accumulated = [] // Reset
return this.emit('message', message)
}
// Don't emit yet
return Promise.resolve()
}
}
Emit multiple messages:
class SplitterProcessor extends Processor {
async process(message) {
const items = message.items || []
// Emit one message per item
for (const item of items) {
const newMessage = { ...message, currentItem: item }
await this.emit('message', newMessage)
}
return Promise.resolve()
}
}
Create focused tests for your processor:
// tests/unit/processors/MyProcessor.test.js
import { expect } from 'chai'
import MyProcessor from '../../../src/processors/text/MyProcessor.js'
describe('MyProcessor', () => {
it('should transform message content', async () => {
const config = { /* test config */ }
const processor = new MyProcessor(config)
const message = { content: 'input' }
const result = await processor.process(message)
expect(result.content).to.equal('expected output')
})
})
Test with real pipeline:
# src/apps/test/my-processor/transmissions.ttl
:test-transmission a :EntryTransmission ;
:pipe (
:setup-test
:my-processor
:verify-result
) .
:my-processor a :MyProcessor ;
:settings [ :settingName "test-value" ] .
Use structured logging:
logger.debug('Debug message with details', { messageId: message.id })
logger.info('Processing started')
logger.warn('Non-fatal issue occurred')
logger.error('Error processing message', { error: error.message })
Use proper RDF namespaces:
import ns from '../../utils/ns.js'
// Framework properties
ns.trn.filename
ns.trn.content
ns.trn.settings
// RDF properties
ns.rdf.type
ns.rdfs.label
Work with message properties:
// Safe property access
const value = message[propertyName] || defaultValue
// Check property existence
if (propertyName in message) {
// Property exists
}
// Add metadata
message.tags = (message.tags || '') + '.my-processor'
message.processedBy = message.processedBy || []
message.processedBy.push('MyProcessor')
Following these patterns will ensure your custom processors integrate seamlessly with the Transmissions framework and follow established conventions.