Source: services/document/Ingester.js

import SPARQLStore from '../../stores/SPARQLStore.js';
import { v4 as uuidv4 } from 'uuid';
import logger from 'loglevel';

/**
 * Document ingestion service with PROV-O integration
 * Ingests chunked documents into SPARQL store with provenance tracking
 */
export default class Ingester {
  constructor(store, options = {}) {
    if (!store) {
      throw new Error('Ingester: SPARQL store is required');
    }

    this.store = store;
    this.config = {
      graphName: options.graphName || 'http://hyperdata.it/content',
      batchSize: options.batchSize || 10,
      enableProvenance: options.enableProvenance !== false,
      ...options
    };
  }

  /**
   * Ingest chunked document data into SPARQL store
   * @param {Object} chunkingResult - Result from Chunker.chunk()
   * @param {Object} options - Ingestion options
   * @returns {Promise<Object>} Ingestion result
   */
  async ingest(chunkingResult, options = {}) {
    if (!chunkingResult || !chunkingResult.chunks) {
      throw new Error('Ingester: chunkingResult with chunks is required');
    }

    try {
      const startTime = Date.now();
      const ingestOptions = { ...this.config, ...options };
      const activityId = uuidv4();

      // Create ingestion activity for provenance
      const activity = this.createIngestionActivity(activityId, chunkingResult);

      // Prepare ingestion data
      const ingestionData = await this.prepareIngestionData(chunkingResult, activity, ingestOptions);

      // Execute SPARQL UPDATE operations
      const results = await this.executeIngestion(ingestionData, ingestOptions);

      const processingTime = Date.now() - startTime;

      logger.info(`Ingester: Successfully ingested ${chunkingResult.chunks.length} chunks in ${processingTime}ms`);

      return {
        success: true,
        chunkCount: chunkingResult.chunks.length,
        processingTime,
        activityId,
        graphName: ingestOptions.graphName,
        results,
        metadata: {
          sourceUri: chunkingResult.sourceUri,
          corpusUri: chunkingResult.corpus?.uri,
          communityUri: chunkingResult.community?.uri,
          ingestionTimestamp: new Date().toISOString()
        }
      };

    } catch (error) {
      logger.error('Ingester: Error during ingestion:', error.message);
      throw new Error(`Ingester: Failed to ingest chunks: ${error.message}`);
    }
  }

  /**
   * Prepare data for SPARQL insertion
   * @private
   * @param {Object} chunkingResult - Chunking result
   * @param {Object} activity - PROV-O activity
   * @param {Object} options - Options
   * @returns {Promise<Array>} Prepared SPARQL update queries
   */
  async prepareIngestionData(chunkingResult, activity, options) {
    const updates = [];

    // 1. Insert source document
    updates.push(this.createDocumentInsertQuery(chunkingResult, activity, options.graphName));

    // 2. Insert corpus
    if (chunkingResult.corpus) {
      updates.push(this.createCorpusInsertQuery(chunkingResult.corpus, activity, options.graphName));
    }

    // 3. Insert community
    if (chunkingResult.community) {
      updates.push(this.createCommunityInsertQuery(chunkingResult.community, activity, options.graphName));
    }

    // 4. Insert chunks (text elements)
    const chunkBatches = this.batchChunks(chunkingResult.chunks, options.batchSize);
    for (const batch of chunkBatches) {
      updates.push(this.createChunksBatchInsertQuery(batch, activity, options.graphName));
    }

    // 5. Insert provenance data if enabled
    if (options.enableProvenance) {
      updates.push(this.createProvenanceInsertQuery(activity, chunkingResult, options.graphName));
    }

    return updates;
  }

  /**
   * Execute SPARQL UPDATE operations
   * @private
   * @param {Array} updateQueries - SPARQL UPDATE queries
   * @param {Object} options - Options
   * @returns {Promise<Array>} Execution results
   */
  async executeIngestion(updateQueries, options) {
    const results = [];

    for (let i = 0; i < updateQueries.length; i++) {
      const query = updateQueries[i];
      
      try {
        logger.debug(`Ingester: Executing update ${i + 1}/${updateQueries.length}`);
        
        // Execute SPARQL UPDATE
        const result = await this.store.executeUpdate(query);
        
        results.push({
          index: i,
          success: true,
          result
        });

      } catch (error) {
        logger.error(`Ingester: Failed to execute update ${i + 1}:`, error.message);
        
        results.push({
          index: i,
          success: false,
          error: error.message,
          query: query.substring(0, 200) + '...' // Truncated for logging
        });

        // Decide whether to continue or abort
        if (options.failFast !== false) {
          throw new Error(`Ingester: Update ${i + 1} failed: ${error.message}`);
        }
      }
    }

    return results;
  }

  /**
   * Create PROV-O ingestion activity
   * @private
   * @param {string} activityId - Activity ID
   * @param {Object} chunkingResult - Chunking result
   * @returns {Object} PROV-O activity
   */
  createIngestionActivity(activityId, chunkingResult) {
    return {
      uri: `${this.config.graphName}/activity/${activityId}`,
      type: 'prov:Activity',
      label: 'Document Ingestion Activity',
      startedAtTime: new Date().toISOString(),
      wasAssociatedWith: 'semem:Ingester',
      used: [chunkingResult.sourceUri],
      generated: [
        chunkingResult.corpus?.uri,
        chunkingResult.community?.uri,
        ...chunkingResult.chunks.map(c => c.uri)
      ].filter(Boolean)
    };
  }

  /**
   * Create SPARQL INSERT query for source document
   * @private
   * @param {Object} chunkingResult - Chunking result
   * @param {Object} activity - PROV-O activity
   * @param {string} graphName - Target graph
   * @returns {string} SPARQL UPDATE query
   */
  createDocumentInsertQuery(chunkingResult, activity, graphName) {
    const metadata = chunkingResult.metadata;
    const sourceUri = chunkingResult.sourceUri;

    return `
      PREFIX ragno: <http://purl.org/stuff/ragno/>
      PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
      PREFIX dcterms: <http://purl.org/dc/terms/>
      PREFIX semem: <http://purl.org/stuff/semem/>
      PREFIX prov: <http://www.w3.org/ns/prov#>
      PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
      
      INSERT DATA {
        GRAPH <${graphName}> {
          <${sourceUri}> a ragno:Corpus ;
            rdfs:label "${this.escapeString(metadata.title || 'Source Document')}" ;
            dcterms:created "${metadata.timestamp || new Date().toISOString()}"^^xsd:dateTime ;
            semem:sourceFile "${this.escapeString(metadata.sourceFile || '')}" ;
            semem:format "${metadata.format || 'unknown'}" ;
            semem:fileSize ${metadata.fileSize || 0} ;
            prov:wasGeneratedBy <${activity.uri}> .
        }
      }
    `;
  }

  /**
   * Create SPARQL INSERT query for corpus
   * @private
   * @param {Object} corpus - Corpus object
   * @param {Object} activity - PROV-O activity
   * @param {string} graphName - Target graph
   * @returns {string} SPARQL UPDATE query
   */
  createCorpusInsertQuery(corpus, activity, graphName) {
    return `
      PREFIX ragno: <http://purl.org/stuff/ragno/>
      PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
      PREFIX dcterms: <http://purl.org/dc/terms/>
      PREFIX prov: <http://www.w3.org/ns/prov#>
      
      INSERT DATA {
        GRAPH <${graphName}> {
          <${corpus.uri}> a ragno:Corpus ;
            rdfs:label "${this.escapeString(corpus.label)}" ;
            dcterms:description "${this.escapeString(corpus.description)}" ;
            ragno:memberCount ${corpus.memberCount} ;
            prov:wasDerivedFrom <${corpus.wasDerivedFrom}> ;
            prov:wasGeneratedBy <${activity.uri}> .
        }
      }
    `;
  }

  /**
   * Create SPARQL INSERT query for community
   * @private
   * @param {Object} community - Community object
   * @param {Object} activity - PROV-O activity
   * @param {string} graphName - Target graph
   * @returns {string} SPARQL UPDATE query
   */
  createCommunityInsertQuery(community, activity, graphName) {
    const elementTriples = community.hasCommunityElement
      .map(elem => `<${community.uri}> ragno:hasCommunityElement <${elem.element}> .`)
      .join('\n            ');

    return `
      PREFIX ragno: <http://purl.org/stuff/ragno/>
      PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
      PREFIX dcterms: <http://purl.org/dc/terms/>
      PREFIX prov: <http://www.w3.org/ns/prov#>
      
      INSERT DATA {
        GRAPH <${graphName}> {
          <${community.uri}> a ragno:Community ;
            rdfs:label "${this.escapeString(community.label)}" ;
            dcterms:description "${this.escapeString(community.description)}" ;
            ragno:elementCount ${community.metadata.elementCount} ;
            ragno:cohesion ${community.metadata.cohesion} ;
            prov:wasGeneratedBy <${activity.uri}> .
          ${elementTriples}
        }
      }
    `;
  }

  /**
   * Create SPARQL INSERT query for chunks batch
   * @private
   * @param {Array} chunks - Chunk batch
   * @param {Object} activity - PROV-O activity
   * @param {string} graphName - Target graph
   * @returns {string} SPARQL UPDATE query
   */
  createChunksBatchInsertQuery(chunks, activity, graphName) {
    const chunkTriples = chunks.map(chunk => {
      return `
        <${chunk.uri}> a ragno:TextElement ;
          rdfs:label "${this.escapeString(chunk.title)}" ;
          ragno:hasContent """${this.escapeString(chunk.content)}""" ;
          ragno:size ${chunk.size} ;
          ragno:index ${chunk.index} ;
          dcterms:isPartOf <${chunk.partOf}> ;
          semem:contentHash "${chunk.metadata.hash}" ;
          prov:wasDerivedFrom <${chunk.partOf}> ;
          prov:wasGeneratedBy <${activity.uri}> .
      `;
    }).join('\n');

    return `
      PREFIX ragno: <http://purl.org/stuff/ragno/>
      PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
      PREFIX dcterms: <http://purl.org/dc/terms/>
      PREFIX semem: <http://purl.org/stuff/semem/>
      PREFIX prov: <http://www.w3.org/ns/prov#>
      
      INSERT DATA {
        GRAPH <${graphName}> {
          ${chunkTriples}
        }
      }
    `;
  }

  /**
   * Create SPARQL INSERT query for provenance data
   * @private
   * @param {Object} activity - PROV-O activity
   * @param {Object} chunkingResult - Chunking result
   * @param {string} graphName - Target graph
   * @returns {string} SPARQL UPDATE query
   */
  createProvenanceInsertQuery(activity, chunkingResult, graphName) {
    const usedTriples = activity.used.map(uri => `<${activity.uri}> prov:used <${uri}> .`).join('\n            ');
    const generatedTriples = activity.generated.map(uri => `<${activity.uri}> prov:generated <${uri}> .`).join('\n            ');

    return `
      PREFIX prov: <http://www.w3.org/ns/prov#>
      PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
      PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
      
      INSERT DATA {
        GRAPH <${graphName}> {
          <${activity.uri}> a prov:Activity ;
            rdfs:label "${this.escapeString(activity.label)}" ;
            prov:startedAtTime "${activity.startedAtTime}"^^xsd:dateTime ;
            prov:wasAssociatedWith <${activity.wasAssociatedWith}> .
          ${usedTriples}
          ${generatedTriples}
        }
      }
    `;
  }

  /**
   * Split chunks into batches for processing
   * @private
   * @param {Array} chunks - Chunks to batch
   * @param {number} batchSize - Size of each batch
   * @returns {Array<Array>} Batched chunks
   */
  batchChunks(chunks, batchSize) {
    const batches = [];
    for (let i = 0; i < chunks.length; i += batchSize) {
      batches.push(chunks.slice(i, i + batchSize));
    }
    return batches;
  }

  /**
   * Escape string for SPARQL
   * @private
   * @param {string} str - String to escape
   * @returns {string} Escaped string
   */
  escapeString(str) {
    if (!str) return '';
    return str
      .replace(/\\/g, '\\\\')
      .replace(/"/g, '\\"')
      .replace(/\n/g, '\\n')
      .replace(/\r/g, '\\r')
      .replace(/\t/g, '\\t');
  }

  /**
   * Query ingested documents
   * @param {Object} options - Query options
   * @returns {Promise<Array>} Query results
   */
  async queryDocuments(options = {}) {
    const query = `
      SELECT DISTINCT ?document ?label ?created ?format ?chunks WHERE {
        GRAPH <${options.graphName || this.config.graphName}> {
          ?document a ragno:Corpus ;
            rdfs:label ?label .
          OPTIONAL { ?document dcterms:created ?created }
          OPTIONAL { ?document semem:format ?format }
          {
            SELECT ?document (COUNT(?chunk) as ?chunks) WHERE {
              ?chunk dcterms:isPartOf ?document .
            } GROUP BY ?document
          }
        }
      }
      ORDER BY DESC(?created)
      ${options.limit ? `LIMIT ${options.limit}` : ''}
    `;

    try {
      return await this.store.query(query);
    } catch (error) {
      logger.error('Ingester: Error querying documents:', error.message);
      throw new Error(`Ingester: Failed to query documents: ${error.message}`);
    }
  }

  /**
   * Query chunks for a specific document
   * @param {string} documentUri - Document URI
   * @param {Object} options - Query options
   * @returns {Promise<Array>} Query results
   */
  async queryDocumentChunks(documentUri, options = {}) {
    if (!documentUri) {
      throw new Error('Ingester: documentUri is required');
    }

    const query = `
      SELECT ?chunk ?label ?content ?size ?index WHERE {
        GRAPH <${options.graphName || this.config.graphName}> {
          ?chunk a ragno:TextElement ;
            dcterms:isPartOf <${documentUri}> ;
            rdfs:label ?label ;
            ragno:hasContent ?content ;
            ragno:size ?size ;
            ragno:index ?index .
        }
      }
      ORDER BY ?index
      ${options.limit ? `LIMIT ${options.limit}` : ''}
    `;

    try {
      return await this.store.query(query);
    } catch (error) {
      logger.error('Ingester: Error querying document chunks:', error.message);
      throw new Error(`Ingester: Failed to query document chunks: ${error.message}`);
    }
  }

  /**
   * Delete document and all related data
   * @param {string} documentUri - Document URI to delete
   * @param {Object} options - Delete options
   * @returns {Promise<Object>} Delete result
   */
  async deleteDocument(documentUri, options = {}) {
    if (!documentUri) {
      throw new Error('Ingester: documentUri is required');
    }

    const deleteQuery = `
      DELETE WHERE {
        GRAPH <${options.graphName || this.config.graphName}> {
          # Delete chunks
          ?chunk dcterms:isPartOf <${documentUri}> .
          ?chunk ?chunkProp ?chunkValue .
          
          # Delete document
          <${documentUri}> ?docProp ?docValue .
          
          # Delete related corpus/community
          ?related prov:wasDerivedFrom <${documentUri}> .
          ?related ?relProp ?relValue .
        }
      }
    `;

    try {
      const result = await this.store.executeUpdate(deleteQuery);
      logger.info(`Ingester: Deleted document ${documentUri}`);
      
      return {
        success: true,
        documentUri,
        result
      };
    } catch (error) {
      logger.error('Ingester: Error deleting document:', error.message);
      throw new Error(`Ingester: Failed to delete document: ${error.message}`);
    }
  }
}