Skip to content

Commit

Permalink
Added back the mystery threads
Browse files Browse the repository at this point in the history
  • Loading branch information
drewjj committed Mar 20, 2024
1 parent 811e439 commit a3b0ec1
Showing 1 changed file with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@
import org.springframework.web.multipart.MultipartFile;

import us.dot.its.jpo.ode.OdeProperties;
import us.dot.its.jpo.ode.exporter.StompStringExporter;
import us.dot.its.jpo.ode.importer.ImporterDirectoryWatcher;
import us.dot.its.jpo.ode.importer.ImporterDirectoryWatcher.ImporterFileType;
import us.dot.its.jpo.ode.storage.StorageFileNotFoundException;
import us.dot.its.jpo.ode.storage.StorageService;

@RestController
public class FileUploadController {
private static final String FILTERED_OUTPUT_TOPIC = "/topic/filtered_messages";
private static final String UNFILTERED_OUTPUT_TOPIC = "/topic/unfiltered_messages";

private static Logger logger = LoggerFactory.getLogger(FileUploadController.class);

Expand All @@ -65,6 +68,20 @@ public FileUploadController(

// Create the importers that watch folders for new/modified files
threadPool.submit(new ImporterDirectoryWatcher(odeProperties, logPath, backupPath, failurePath, ImporterFileType.LOG_FILE, odeProperties.getFileWatcherPeriod()));

// Create unfiltered exporters
threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeBsmJson()));
threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeTimJson()));
threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeSpatJson()));
threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeMapJson()));
threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeSsmJson()));
threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeSrmJson()));
threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicDriverAlertJson()));
threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeTimBroadcastJson()));

// Create filtered exporters
threadPool.submit(new StompStringExporter(odeProperties, FILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicFilteredOdeBsmJson()));
threadPool.submit(new StompStringExporter(odeProperties, FILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicFilteredOdeTimJson()));
}

@PostMapping("/upload/{type}")
Expand Down

0 comments on commit a3b0ec1

Please sign in to comment.