Source: services/embeddings/EmbeddingCreator.js

import logger from 'loglevel';
import EmbeddingService from './EmbeddingService.js';
import SPARQLService from './SPARQLService.js';

/**
 * Service to create and store embeddings for articles
 */
class EmbeddingCreator {
    /**
     * Creates a new EmbeddingCreator
     * @param {Object} options - Configuration options
     * @param {EmbeddingService} options.embeddingService - The embedding service to use
     * @param {SPARQLService} options.sparqlService - The SPARQL service to use
     * @param {string} options.graphName - The graph name to work with
     * @param {string} options.contentPredicate - The predicate for content
     * @param {string} options.embeddingPredicate - The predicate for embeddings
     */
    constructor(options = {}) {
        this.embeddingService = options.embeddingService || new EmbeddingService();
        this.sparqlService = options.sparqlService || new SPARQLService();
        this.graphName = options.graphName || 'http://hyperdata.it/content';
        this.contentPredicate = options.contentPredicate || 'http://schema.org/articleBody';
        this.embeddingPredicate = options.embeddingPredicate || 'http://example.org/embedding/vector';

        this.stats = {
            total: 0,
            processed: 0,
            successful: 0,
            failed: 0,
            skipped: 0
        };

        logger.info(`EmbeddingCreator initialized for graph: ${this.graphName}`);
    }

    /**
     * Run the embedding creation process
     * @param {Object} options - Processing options
     * @param {number} options.limit - Maximum number of articles to process (0 for all)
     * @param {number} options.delay - Delay between articles in ms
     * @returns {Promise<Object>} The statistics from the run
     */
    async run(options = {}) {
        const limit = options.limit || 0;
        const delay = options.delay || 500;

        try {
            // First check if the graph exists
            logger.info(`Checking if graph <${this.graphName}> exists...`);

            const graphExists = await this.sparqlService.graphExists(this.graphName);
            if (!graphExists) {
                throw new Error(`Graph ${this.graphName} does not exist or is empty`);
            }

            logger.info(`Graph <${this.graphName}> exists and contains data`);

            // Execute the query to get articles
            const limitClause = limit > 0 ? `LIMIT ${limit}` : '';
            const query = `
                SELECT * WHERE {
                    GRAPH <${this.graphName}> {
                        ?article <${this.contentPredicate}> ?content
                    }
                }
                ${limitClause}
            `;

            const results = await this.sparqlService.executeQuery(query);
            const articles = results.results.bindings;

            this.stats.total = articles.length;
            logger.info(`Found ${articles.length} articles to process`);

            // Check if articles already have embeddings
            logger.info('Checking for existing embeddings...');

            const checkQuery = `
                SELECT ?article WHERE {
                    GRAPH <${this.graphName}> {
                        ?article <${this.embeddingPredicate}> ?embedding .
                    }
                }
            `;

            const existingEmbeddings = await this.sparqlService.executeQuery(checkQuery);
            const articlesWithEmbeddings = new Set();

            existingEmbeddings.results.bindings.forEach(binding => {
                articlesWithEmbeddings.add(binding.article.value);
            });

            logger.info(`Found ${articlesWithEmbeddings.size} articles with existing embeddings`);

            // Process each article
            for (let i = 0; i < articles.length; i++) {
                const article = articles[i];
                const articleUri = article.article.value;
                const content = article.content.value;

                this.stats.processed++;

                if (articlesWithEmbeddings.has(articleUri)) {
                    logger.info(`Skipping article ${i + 1}/${articles.length} (already has embedding): ${articleUri}`);
                    this.stats.skipped++;
                    continue;
                }

                logger.info(`Processing article ${i + 1}/${articles.length}: ${articleUri}`);

                // Validate content
                if (!content || content.trim().length < 10) {
                    logger.warn(`Skipping article with insufficient content: ${articleUri}`);
                    this.stats.skipped++;
                    continue;
                }

                // Generate embedding for content
                try {
                    const embedding = await this.embeddingService.generateEmbedding(content);

                    // Store the embedding in the SPARQL store
                    await this.sparqlService.storeEmbedding(
                        articleUri,
                        embedding,
                        this.graphName,
                        this.embeddingPredicate
                    );

                    logger.info(`Successfully processed article: ${articleUri}`);
                    this.stats.successful++;

                    // Space out requests to avoid overloading the embedding service
                    if (i < articles.length - 1) {
                        await new Promise(resolve => setTimeout(resolve, delay));
                    }
                } catch (error) {
                    logger.error(`Failed to process article ${articleUri}:`, error);
                    this.stats.failed++;
                    continue;
                }

                // Log progress every 10 articles
                if (this.stats.processed % 10 === 0 || this.stats.processed === this.stats.total) {
                    this.logProgress();
                }
            }

            logger.info('Completed embedding generation process');
            this.logFinalStats();

            return this.stats;
        } catch (error) {
            logger.error('Error during embedding creation:', error);
            throw error;
        }
    }

    /**
     * Log current progress
     */
    logProgress() {
        const progressPercent = Math.round(this.stats.processed / this.stats.total * 100);
        logger.info(`Progress: ${this.stats.processed}/${this.stats.total} articles (${progressPercent}%)`);
        logger.info(`Success: ${this.stats.successful}, Failed: ${this.stats.failed}, Skipped: ${this.stats.skipped}`);
    }

    /**
     * Log final statistics
     */
    logFinalStats() {
        const completionRate = Math.round((this.stats.successful + this.stats.skipped) / this.stats.total * 100);

        logger.info(`Final statistics:
- Total articles: ${this.stats.total}
- Successfully processed: ${this.stats.successful}
- Failed: ${this.stats.failed}
- Skipped (already had embeddings): ${this.stats.skipped}
- Overall completion rate: ${completionRate}%
`);
    }
}

export default EmbeddingCreator;