diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/upload/FileUploadController.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/upload/FileUploadController.java index b3ab682dd..7ad63c352 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/upload/FileUploadController.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/upload/FileUploadController.java @@ -34,6 +34,7 @@ import org.springframework.web.multipart.MultipartFile; import us.dot.its.jpo.ode.OdeProperties; +import us.dot.its.jpo.ode.exporter.StompStringExporter; import us.dot.its.jpo.ode.importer.ImporterDirectoryWatcher; import us.dot.its.jpo.ode.importer.ImporterDirectoryWatcher.ImporterFileType; import us.dot.its.jpo.ode.storage.StorageFileNotFoundException; @@ -41,6 +42,8 @@ @RestController public class FileUploadController { + private static final String FILTERED_OUTPUT_TOPIC = "/topic/filtered_messages"; + private static final String UNFILTERED_OUTPUT_TOPIC = "/topic/unfiltered_messages"; private static Logger logger = LoggerFactory.getLogger(FileUploadController.class); @@ -65,6 +68,20 @@ public FileUploadController( // Create the importers that watch folders for new/modified files threadPool.submit(new ImporterDirectoryWatcher(odeProperties, logPath, backupPath, failurePath, ImporterFileType.LOG_FILE, odeProperties.getFileWatcherPeriod())); + + // Create unfiltered exporters + threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeBsmJson())); + threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeTimJson())); + threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeSpatJson())); + threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeMapJson())); + threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeSsmJson())); + threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeSrmJson())); + threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicDriverAlertJson())); + threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeTimBroadcastJson())); + + // Create filtered exporters + threadPool.submit(new StompStringExporter(odeProperties, FILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicFilteredOdeBsmJson())); + threadPool.submit(new StompStringExporter(odeProperties, FILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicFilteredOdeTimJson())); } @PostMapping("/upload/{type}")