644 lines
21 KiB
TypeScript
644 lines
21 KiB
TypeScript
export interface JobProgress {
|
|
jobId: string;
|
|
type: 's3-sync' | 'song-matching';
|
|
status: 'running' | 'completed' | 'failed';
|
|
progress: number; // 0-100
|
|
current: number;
|
|
total: number;
|
|
message: string;
|
|
startTime: Date;
|
|
endTime?: Date;
|
|
result?: any;
|
|
error?: string;
|
|
}
|
|
|
|
export interface JobOptions {
|
|
type: 'storage-sync' | 'song-matching';
|
|
options?: any;
|
|
}
|
|
|
|
class BackgroundJobService {
|
|
private jobs: Map<string, JobProgress> = new Map();
|
|
private jobIdCounter = 0;
|
|
|
|
/**
|
|
* Start a new background job
|
|
*/
|
|
async startJob(jobOptions: JobOptions): Promise<string> {
|
|
const jobId = `job_${++this.jobIdCounter}_${Date.now()}`;
|
|
|
|
const job: JobProgress = {
|
|
jobId,
|
|
type: jobOptions.type,
|
|
status: 'running',
|
|
progress: 0,
|
|
current: 0,
|
|
total: 0,
|
|
message: `Starting ${jobOptions.type}...`,
|
|
startTime: new Date(),
|
|
};
|
|
|
|
this.jobs.set(jobId, job);
|
|
|
|
// Start the job in the background
|
|
this.runJob(jobId, jobOptions).catch(error => {
|
|
console.error(`Job ${jobId} failed:`, error);
|
|
const job = this.jobs.get(jobId);
|
|
if (job) {
|
|
job.status = 'failed';
|
|
job.error = error.message;
|
|
job.endTime = new Date();
|
|
}
|
|
});
|
|
|
|
return jobId;
|
|
}
|
|
|
|
/**
|
|
* Get job progress
|
|
*/
|
|
getJobProgress(jobId: string): JobProgress | null {
|
|
return this.jobs.get(jobId) || null;
|
|
}
|
|
|
|
/**
|
|
* Get all jobs
|
|
*/
|
|
getAllJobs(): JobProgress[] {
|
|
return Array.from(this.jobs.values());
|
|
}
|
|
|
|
/**
|
|
* Update job progress
|
|
*/
|
|
updateProgress(jobId: string, progress: Partial<JobProgress>): void {
|
|
const job = this.jobs.get(jobId);
|
|
if (job) {
|
|
Object.assign(job, progress);
|
|
|
|
// Calculate percentage if current and total are provided
|
|
if (progress.current !== undefined && progress.total !== undefined && progress.total > 0) {
|
|
job.progress = Math.round((progress.current / progress.total) * 100);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Complete a job
|
|
*/
|
|
completeJob(jobId: string, result?: any): void {
|
|
const job = this.jobs.get(jobId);
|
|
if (job) {
|
|
job.status = 'completed';
|
|
job.progress = 100;
|
|
job.endTime = new Date();
|
|
job.result = result;
|
|
job.message = 'Job completed successfully';
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Fail a job
|
|
*/
|
|
failJob(jobId: string, error: string): void {
|
|
const job = this.jobs.get(jobId);
|
|
if (job) {
|
|
job.status = 'failed';
|
|
job.error = error;
|
|
job.endTime = new Date();
|
|
job.message = `Job failed: ${error}`;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Clean up old completed jobs (keep last 10)
|
|
*/
|
|
cleanupOldJobs(): void {
|
|
const allJobs = Array.from(this.jobs.values());
|
|
const completedJobs = allJobs.filter(job => job.status === 'completed' || job.status === 'failed');
|
|
|
|
if (completedJobs.length > 10) {
|
|
// Sort by end time and remove oldest
|
|
completedJobs.sort((a, b) => {
|
|
const aTime = a.endTime?.getTime() || 0;
|
|
const bTime = b.endTime?.getTime() || 0;
|
|
return aTime - bTime;
|
|
});
|
|
|
|
const toRemove = completedJobs.slice(0, completedJobs.length - 10);
|
|
toRemove.forEach(job => {
|
|
this.jobs.delete(job.jobId);
|
|
});
|
|
|
|
console.log(`🧹 Cleaned up ${toRemove.length} old jobs`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Run the actual job
|
|
*/
|
|
private async runJob(jobId: string, jobOptions: JobOptions): Promise<void> {
|
|
try {
|
|
switch (jobOptions.type) {
|
|
case 'storage-sync':
|
|
await this.runStorageSyncJob(jobId, jobOptions.options);
|
|
break;
|
|
case 'song-matching':
|
|
await this.runSongMatchingJob(jobId, jobOptions.options);
|
|
break;
|
|
default:
|
|
throw new Error(`Unknown job type: ${jobOptions.type}`);
|
|
}
|
|
} catch (error) {
|
|
this.failJob(jobId, error instanceof Error ? error.message : 'Unknown error');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Run storage sync job (works with any storage provider)
|
|
*/
|
|
private async runStorageSyncJob(jobId: string, options?: any): Promise<void> {
|
|
try {
|
|
// Import here to avoid circular dependencies
|
|
const { StorageProviderFactory } = await import('./storageProvider.js');
|
|
const { AudioMetadataService } = await import('./audioMetadataService.js');
|
|
const { SongMatchingService } = await import('./songMatchingService.js');
|
|
const { MusicFile } = await import('../models/MusicFile.js');
|
|
const { Song } = await import('../models/Song.js');
|
|
|
|
// Get the configured storage provider
|
|
const config = await StorageProviderFactory.loadConfig();
|
|
const storageService = await StorageProviderFactory.createProvider(config);
|
|
const audioMetadataService = new AudioMetadataService();
|
|
const songMatchingService = new SongMatchingService();
|
|
|
|
// Helper to set correct MIME type based on file extension
|
|
const guessContentType = (fileName: string): string => {
|
|
const ext = (fileName.split('.').pop() || '').toLowerCase();
|
|
switch (ext) {
|
|
case 'mp3': return 'audio/mpeg';
|
|
case 'wav': return 'audio/wav';
|
|
case 'flac': return 'audio/flac';
|
|
case 'm4a': return 'audio/mp4';
|
|
case 'aac': return 'audio/aac';
|
|
case 'ogg': return 'audio/ogg';
|
|
case 'opus': return 'audio/opus';
|
|
case 'wma': return 'audio/x-ms-wma';
|
|
default: return 'application/octet-stream';
|
|
}
|
|
};
|
|
|
|
// Optional: clear existing links for a force re-sync
|
|
if (options?.clearLinks) {
|
|
this.updateProgress(jobId, {
|
|
message: 'Clearing existing song-file links...',
|
|
current: 0,
|
|
total: 0
|
|
});
|
|
|
|
await Promise.all([
|
|
// Reset song s3File metadata
|
|
Song.updateMany({}, {
|
|
$set: {
|
|
's3File.musicFileId': null,
|
|
's3File.s3Key': null,
|
|
's3File.s3Url': null,
|
|
's3File.streamingUrl': null,
|
|
's3File.hasS3File': false
|
|
}
|
|
}),
|
|
// Unlink music files
|
|
MusicFile.updateMany({}, { $unset: { songId: '' } })
|
|
]);
|
|
}
|
|
|
|
// Phase 1: Quick filename matching
|
|
this.updateProgress(jobId, {
|
|
message: `Phase 1: Fetching files from ${config.provider.toUpperCase()}...`,
|
|
current: 0,
|
|
total: 0
|
|
});
|
|
|
|
const storageFiles = await storageService.listAllFiles();
|
|
const audioFiles = storageFiles.filter(storageFile => {
|
|
const filename = storageFile.key.split('/').pop() || storageFile.key;
|
|
return audioMetadataService.isAudioFile(filename);
|
|
});
|
|
|
|
this.updateProgress(jobId, {
|
|
message: `Phase 1: Found ${audioFiles.length} audio files, checking database...`,
|
|
current: 0,
|
|
total: audioFiles.length
|
|
});
|
|
|
|
// Get existing files
|
|
const existingFiles = await MusicFile.find({}, { s3Key: 1 });
|
|
const existingStorageKeys = new Set(existingFiles.map(f => f.s3Key));
|
|
const newAudioFiles = options?.force ? audioFiles : audioFiles.filter(storageFile => !existingStorageKeys.has(storageFile.key));
|
|
|
|
this.updateProgress(jobId, {
|
|
message: `Phase 1: Processing ${newAudioFiles.length} new audio files...`,
|
|
current: 0,
|
|
total: newAudioFiles.length
|
|
});
|
|
|
|
// Get all songs for filename matching
|
|
const allSongs = await Song.find({}, { id: 1, title: 1, artist: 1, location: 1 });
|
|
|
|
const quickMatches: any[] = [];
|
|
const unmatchedFiles: any[] = [];
|
|
let processedCount = 0;
|
|
let phase1Errors = 0;
|
|
|
|
for (const storageFile of newAudioFiles) {
|
|
processedCount++;
|
|
|
|
try {
|
|
const filename = storageFile.key.split('/').pop() || storageFile.key;
|
|
|
|
this.updateProgress(jobId, {
|
|
message: `Phase 1: Quick filename matching`,
|
|
current: processedCount,
|
|
total: newAudioFiles.length
|
|
});
|
|
|
|
// Quick filename matching logic
|
|
// Decode URL-encoded sequences so %20, %27 etc. are compared correctly
|
|
const safeDecode = (s: string): string => { try { return decodeURIComponent(s); } catch { return s; } };
|
|
const stripDiacritics = (s: string) => s.normalize('NFKD').replace(/[\u0300-\u036f]/g, '');
|
|
const normalizedStorageFilename = stripDiacritics(safeDecode(filename)).replace(/\.[^/.]+$/, '').toLowerCase();
|
|
let matchedSong = null;
|
|
|
|
for (const song of allSongs) {
|
|
if (song.location) {
|
|
const rekordboxFilename = song.location.split(/[/\\]/).pop() || song.location;
|
|
const normalizedRekordboxFilename = stripDiacritics(safeDecode(rekordboxFilename)).replace(/\.[^/.]+$/, '').toLowerCase();
|
|
|
|
if (normalizedStorageFilename === normalizedRekordboxFilename) {
|
|
matchedSong = song;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (matchedSong) {
|
|
const basicMetadata = audioMetadataService.extractBasicMetadataFromFilename(filename);
|
|
// Reuse existing MusicFile if present (force mode), otherwise create new
|
|
let musicFile = await MusicFile.findOne({ s3Key: storageFile.key });
|
|
if (!musicFile) {
|
|
musicFile = new MusicFile({ s3Key: storageFile.key });
|
|
}
|
|
musicFile.originalName = filename;
|
|
musicFile.s3Key = storageFile.key;
|
|
musicFile.s3Url = await storageService.getPresignedUrl(storageFile.key);
|
|
musicFile.contentType = guessContentType(filename);
|
|
musicFile.size = storageFile.size;
|
|
Object.assign(musicFile, basicMetadata);
|
|
musicFile.songId = matchedSong._id;
|
|
|
|
await musicFile.save();
|
|
quickMatches.push(musicFile);
|
|
|
|
// Update the Song document to indicate it has a storage file
|
|
const storageUrl = await storageService.getPresignedUrl(storageFile.key);
|
|
await Song.updateOne(
|
|
{ _id: matchedSong._id },
|
|
{
|
|
$set: {
|
|
's3File.musicFileId': musicFile._id,
|
|
's3File.s3Key': storageFile.key,
|
|
's3File.s3Url': storageUrl,
|
|
's3File.streamingUrl': storageUrl,
|
|
's3File.hasS3File': true
|
|
}
|
|
}
|
|
);
|
|
|
|
console.log(`✅ Quick match saved immediately: ${filename}`);
|
|
} else {
|
|
unmatchedFiles.push(storageFile);
|
|
}
|
|
|
|
|
|
|
|
} catch (error) {
|
|
console.error(`Error in quick matching ${storageFile.key}:`, error);
|
|
unmatchedFiles.push(storageFile);
|
|
phase1Errors++;
|
|
}
|
|
}
|
|
|
|
this.updateProgress(jobId, {
|
|
message: `Phase 1 completed: ${quickMatches.length} quick matches, ${unmatchedFiles.length} unmatched files`,
|
|
current: newAudioFiles.length,
|
|
total: newAudioFiles.length
|
|
});
|
|
|
|
// Phase 2: Complex matching for unmatched files
|
|
if (unmatchedFiles.length > 0) {
|
|
this.updateProgress(jobId, {
|
|
message: `Phase 2: Complex matching for ${unmatchedFiles.length} files...`,
|
|
current: 0,
|
|
total: unmatchedFiles.length
|
|
});
|
|
|
|
let complexMatches = 0;
|
|
let stillUnmatched = 0;
|
|
let phase2Errors = 0;
|
|
const processedFiles: any[] = [];
|
|
|
|
for (let i = 0; i < unmatchedFiles.length; i++) {
|
|
const storageFile = unmatchedFiles[i];
|
|
|
|
try {
|
|
const filename = storageFile.key.split('/').pop() || storageFile.key;
|
|
|
|
this.updateProgress(jobId, {
|
|
message: `Phase 2: Complex matching`,
|
|
current: i + 1,
|
|
total: unmatchedFiles.length
|
|
});
|
|
|
|
// Download file and extract metadata
|
|
const fileBuffer = await storageService.getFileContent(storageFile.key);
|
|
const metadata = await audioMetadataService.extractMetadata(fileBuffer, filename);
|
|
|
|
// Reuse existing MusicFile document if present to avoid duplicate key errors
|
|
let musicFile = await MusicFile.findOne({ s3Key: storageFile.key });
|
|
if (!musicFile) {
|
|
musicFile = new MusicFile({ s3Key: storageFile.key });
|
|
}
|
|
musicFile.originalName = filename;
|
|
musicFile.s3Key = storageFile.key;
|
|
musicFile.s3Url = await storageService.getPresignedUrl(storageFile.key);
|
|
musicFile.contentType = guessContentType(filename);
|
|
musicFile.size = storageFile.size;
|
|
Object.assign(musicFile, metadata);
|
|
|
|
// Try complex matching
|
|
const matchResult = await songMatchingService.matchMusicFileToSongs(musicFile, {
|
|
minConfidence: 0.7,
|
|
enableFuzzyMatching: true,
|
|
enablePartialMatching: true,
|
|
maxResults: 1
|
|
});
|
|
|
|
if (matchResult.length > 0 && matchResult[0].confidence >= 0.7) {
|
|
const bestMatch = matchResult[0];
|
|
musicFile.songId = bestMatch.song._id;
|
|
complexMatches++;
|
|
|
|
// Update the Song document to indicate it has a storage file
|
|
const storageUrl = await storageService.getPresignedUrl(storageFile.key);
|
|
await Song.updateOne(
|
|
{ _id: bestMatch.song._id },
|
|
{
|
|
$set: {
|
|
's3File.musicFileId': musicFile._id,
|
|
's3File.s3Key': storageFile.key,
|
|
's3File.s3Url': storageUrl,
|
|
's3File.streamingUrl': storageUrl,
|
|
's3File.hasS3File': true
|
|
}
|
|
}
|
|
);
|
|
} else {
|
|
stillUnmatched++;
|
|
}
|
|
|
|
// Save immediately for real-time availability (create or update)
|
|
await musicFile.save();
|
|
processedFiles.push(musicFile);
|
|
|
|
console.log(`✅ Complex match saved immediately: ${filename} (confidence: ${matchResult.length > 0 ? matchResult[0].confidence : 'N/A'})`);
|
|
|
|
|
|
|
|
} catch (error) {
|
|
console.error(`Error processing ${storageFile.key}:`, error);
|
|
stillUnmatched++;
|
|
phase2Errors++;
|
|
}
|
|
}
|
|
|
|
this.updateProgress(jobId, {
|
|
message: `Phase 2 completed: ${complexMatches} complex matches, ${stillUnmatched} still unmatched`,
|
|
current: unmatchedFiles.length,
|
|
total: unmatchedFiles.length
|
|
});
|
|
|
|
// All files have been saved immediately during processing
|
|
console.log(`✅ All files saved immediately during processing`);
|
|
|
|
const result = {
|
|
phase1: {
|
|
totalFiles: newAudioFiles.length,
|
|
quickMatches: quickMatches.length,
|
|
unmatchedFiles: unmatchedFiles.length,
|
|
errors: 0
|
|
},
|
|
phase2: {
|
|
processedFiles: processedFiles.length,
|
|
complexMatches,
|
|
stillUnmatched,
|
|
errors: 0
|
|
},
|
|
total: {
|
|
processed: newAudioFiles.length,
|
|
matched: quickMatches.length + complexMatches,
|
|
unmatched: stillUnmatched,
|
|
errors: 0
|
|
}
|
|
};
|
|
|
|
this.completeJob(jobId, result);
|
|
} else {
|
|
// No unmatched files, all quick matches have been saved immediately
|
|
console.log(`✅ All quick matches saved immediately during processing`);
|
|
|
|
const result = {
|
|
phase1: {
|
|
totalFiles: newAudioFiles.length,
|
|
quickMatches: quickMatches.length,
|
|
unmatchedFiles: 0,
|
|
errors: 0
|
|
},
|
|
phase2: {
|
|
processedFiles: 0,
|
|
complexMatches: 0,
|
|
stillUnmatched: 0,
|
|
errors: 0
|
|
},
|
|
total: {
|
|
processed: quickMatches.length,
|
|
matched: quickMatches.length,
|
|
unmatched: 0,
|
|
errors: 0
|
|
}
|
|
};
|
|
|
|
this.completeJob(jobId, result);
|
|
}
|
|
|
|
} catch (error) {
|
|
this.failJob(jobId, error instanceof Error ? error.message : 'Unknown error');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Run song matching job
|
|
*/
|
|
private async runSongMatchingJob(jobId: string, options?: any): Promise<void> {
|
|
try {
|
|
// Import here to avoid circular dependencies
|
|
const { SongMatchingService } = await import('./songMatchingService.js');
|
|
const { MusicFile } = await import('../models/MusicFile.js');
|
|
const { Song } = await import('../models/Song.js');
|
|
|
|
const matchingService = new SongMatchingService();
|
|
|
|
this.updateProgress(jobId, {
|
|
message: 'Finding unmatched music files...',
|
|
current: 0,
|
|
total: 0
|
|
});
|
|
|
|
// Get all unmatched music files
|
|
const unmatchedMusicFiles = await MusicFile.find({ songId: { $exists: false } });
|
|
|
|
this.updateProgress(jobId, {
|
|
message: `Found ${unmatchedMusicFiles.length} unmatched music files`,
|
|
current: 0,
|
|
total: unmatchedMusicFiles.length
|
|
});
|
|
|
|
if (unmatchedMusicFiles.length === 0) {
|
|
this.updateProgress(jobId, {
|
|
message: 'No unmatched music files found',
|
|
current: 0,
|
|
total: 0
|
|
});
|
|
|
|
this.completeJob(jobId, {
|
|
linked: 0,
|
|
unmatched: 0,
|
|
total: 0
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Get all songs for matching
|
|
const allSongs = await Song.find({});
|
|
|
|
this.updateProgress(jobId, {
|
|
message: `Starting matching process with ${allSongs.length} songs...`,
|
|
current: 0,
|
|
total: unmatchedMusicFiles.length
|
|
});
|
|
|
|
let linked = 0;
|
|
let unmatched = 0;
|
|
const batchSize = 50;
|
|
const musicFileUpdates: any[] = [];
|
|
const songUpdates: any[] = [];
|
|
|
|
for (let i = 0; i < unmatchedMusicFiles.length; i++) {
|
|
const musicFile = unmatchedMusicFiles[i];
|
|
|
|
this.updateProgress(jobId, {
|
|
message: `Matching music files`,
|
|
current: i + 1,
|
|
total: unmatchedMusicFiles.length
|
|
});
|
|
|
|
try {
|
|
// Get matching suggestions
|
|
const matches = await matchingService.matchMusicFileToSongs(musicFile, {
|
|
minConfidence: options?.minConfidence || 0.7,
|
|
enableFuzzyMatching: options?.enableFuzzyMatching !== false,
|
|
enablePartialMatching: options?.enablePartialMatching !== false,
|
|
maxResults: 1
|
|
});
|
|
|
|
if (matches.length > 0 && matches[0].confidence >= (options?.minConfidence || 0.7)) {
|
|
const bestMatch = matches[0];
|
|
|
|
// Prepare updates
|
|
musicFileUpdates.push({
|
|
updateOne: {
|
|
filter: { _id: musicFile._id },
|
|
update: { songId: bestMatch.song._id }
|
|
}
|
|
});
|
|
|
|
songUpdates.push({
|
|
updateOne: {
|
|
filter: { _id: bestMatch.song._id },
|
|
update: {
|
|
$set: {
|
|
's3File.musicFileId': musicFile._id,
|
|
's3File.s3Key': musicFile.s3Key,
|
|
's3File.s3Url': musicFile.s3Url,
|
|
's3File.streamingUrl': musicFile.s3Key ? `${process.env.S3_ENDPOINT}/${process.env.S3_BUCKET_NAME}/${musicFile.s3Key}` : undefined,
|
|
's3File.hasS3File': true
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
linked++;
|
|
} else {
|
|
unmatched++;
|
|
}
|
|
|
|
// Process batch updates
|
|
if (musicFileUpdates.length >= batchSize) {
|
|
this.updateProgress(jobId, {
|
|
message: `Saving to database`,
|
|
current: i + 1,
|
|
total: unmatchedMusicFiles.length
|
|
});
|
|
|
|
await MusicFile.bulkWrite(musicFileUpdates);
|
|
await Song.bulkWrite(songUpdates);
|
|
|
|
musicFileUpdates.length = 0;
|
|
songUpdates.length = 0;
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error(`Error matching ${musicFile.originalName}:`, error);
|
|
unmatched++;
|
|
}
|
|
}
|
|
|
|
// Save remaining updates
|
|
if (musicFileUpdates.length > 0) {
|
|
this.updateProgress(jobId, {
|
|
message: `Saving to database`,
|
|
current: unmatchedMusicFiles.length,
|
|
total: unmatchedMusicFiles.length
|
|
});
|
|
|
|
await MusicFile.bulkWrite(musicFileUpdates);
|
|
await Song.bulkWrite(songUpdates);
|
|
}
|
|
|
|
const result = {
|
|
linked,
|
|
unmatched,
|
|
total: unmatchedMusicFiles.length
|
|
};
|
|
|
|
this.completeJob(jobId, result);
|
|
|
|
} catch (error) {
|
|
this.failJob(jobId, error instanceof Error ? error.message : 'Unknown error');
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
|
|
export const backgroundJobService = new BackgroundJobService();
|