Skip to content

Commit

Permalink
Fixed leaks in WebSocketScopes and JDKSchedulingService
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Aug 30, 2022
1 parent 3b8e62c commit 47d62bf
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 23 deletions.
15 changes: 9 additions & 6 deletions common/src/main/java/org/red5/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class Server implements IServer, ApplicationContextAware, InitializingBea

public Set<IConnectionListener> connectionListeners = new CopyOnWriteArraySet<IConnectionListener>();

// delay between posting a notification and informing any listeners
public long notificationDelay = 5L;

/**
* Setter for Spring application context
*
Expand Down Expand Up @@ -297,7 +300,7 @@ public void removeListener(IConnectionListener listener) {
* the scope that was created
*/
public void notifyScopeCreated(IScope scope) {
schedulingService.addScheduledOnceJob(10, new ScheduledNotificationJob(JobAction.CREATED, scope));
schedulingService.addScheduledOnceJob(notificationDelay, new ScheduledNotificationJob(JobAction.CREATED, scope));
}

/**
Expand All @@ -307,7 +310,7 @@ public void notifyScopeCreated(IScope scope) {
* the scope that was removed
*/
public void notifyScopeRemoved(IScope scope) {
schedulingService.addScheduledOnceJob(10, new ScheduledNotificationJob(JobAction.REMOVED, scope));
schedulingService.addScheduledOnceJob(notificationDelay, new ScheduledNotificationJob(JobAction.REMOVED, scope));
}

/**
Expand All @@ -317,7 +320,7 @@ public void notifyScopeRemoved(IScope scope) {
* the scope that was added
*/
public void notifyBasicScopeAdded(IBasicScope scope) {
schedulingService.addScheduledOnceJob(10, new ScheduledNotificationJob(JobAction.BASIC_ADD, scope));
schedulingService.addScheduledOnceJob(notificationDelay, new ScheduledNotificationJob(JobAction.BASIC_ADD, scope));
}

/**
Expand All @@ -327,7 +330,7 @@ public void notifyBasicScopeAdded(IBasicScope scope) {
* the scope that was removed
*/
public void notifyBasicScopeRemoved(IBasicScope scope) {
schedulingService.addScheduledOnceJob(10, new ScheduledNotificationJob(JobAction.BASIC_REMOVE, scope));
schedulingService.addScheduledOnceJob(notificationDelay, new ScheduledNotificationJob(JobAction.BASIC_REMOVE, scope));
}

/**
Expand All @@ -337,7 +340,7 @@ public void notifyBasicScopeRemoved(IBasicScope scope) {
* the new connection
*/
public void notifyConnected(IConnection conn) {
schedulingService.addScheduledOnceJob(10, new ScheduledNotificationJob(JobAction.CONNECTED, conn));
schedulingService.addScheduledOnceJob(notificationDelay, new ScheduledNotificationJob(JobAction.CONNECTED, conn));
}

/**
Expand All @@ -347,7 +350,7 @@ public void notifyConnected(IConnection conn) {
* the disconnected connection
*/
public void notifyDisconnected(final IConnection conn) {
schedulingService.addScheduledOnceJob(10, new ScheduledNotificationJob(JobAction.DISCONNECTED, conn));
schedulingService.addScheduledOnceJob(notificationDelay, new ScheduledNotificationJob(JobAction.DISCONNECTED, conn));
}

// job actions for scope notifications
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ public String addScheduledJob(int interval, IScheduledJob job) {
if (schedulingService == null) {
schedulingService = (ISchedulingService) ScopeUtils.getScopeService(scope, ISchedulingService.class, JDKSchedulingService.class, false);
}
// the caller is requred to call removal one they have finished with this job
return schedulingService.addScheduledJob(interval, job);
}

Expand Down Expand Up @@ -1025,6 +1026,7 @@ public String addScheduledJobAfterDelay(int interval, IScheduledJob job, int del
if (schedulingService == null) {
schedulingService = (ISchedulingService) ScopeUtils.getScopeService(scope, ISchedulingService.class, JDKSchedulingService.class, false);
}
// the caller is requred to call removal one they have finished with this job
return schedulingService.addScheduledJobAfterDelay(interval, job, delay);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ public String addScheduledJob(int interval, IScheduledJob job) {
Map<String, Object> jobData = new HashMap<>();
jobData.put(ISchedulingService.SCHEDULING_SERVICE, this);
jobData.put(ISchedulingService.SCHEDULED_JOB, job);
// runnable task
JDKSchedulingServiceJob schedJob = new JDKSchedulingServiceJob();
schedJob.setJobDataMap(jobData);
// runnable task with false to prevent removal after it runs, since this job is meant to iterate
JDKSchedulingServiceJob schedJob = new JDKSchedulingServiceJob(name, jobData, false);
// schedule it to run at interval
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(schedJob, interval, interval, TimeUnit.MILLISECONDS);
// add to the key map
Expand All @@ -103,8 +102,7 @@ public String addScheduledOnceJob(Date date, IScheduledJob job) {
jobData.put(ISchedulingService.SCHEDULING_SERVICE, this);
jobData.put(ISchedulingService.SCHEDULED_JOB, job);
// runnable task
JDKSchedulingServiceJob schedJob = new JDKSchedulingServiceJob();
schedJob.setJobDataMap(jobData);
JDKSchedulingServiceJob schedJob = new JDKSchedulingServiceJob(name, jobData);
// calculate the delay
long delay = date.getTime() - System.currentTimeMillis();
// schedule it to run once after the specified delay
Expand All @@ -123,8 +121,7 @@ public String addScheduledOnceJob(long timeDelta, IScheduledJob job) {
jobData.put(ISchedulingService.SCHEDULING_SERVICE, this);
jobData.put(ISchedulingService.SCHEDULED_JOB, job);
// runnable task
JDKSchedulingServiceJob schedJob = new JDKSchedulingServiceJob();
schedJob.setJobDataMap(jobData);
JDKSchedulingServiceJob schedJob = new JDKSchedulingServiceJob(name, jobData);
// schedule it to run once after the specified delay
ScheduledFuture<?> future = scheduler.schedule(schedJob, timeDelta, TimeUnit.MILLISECONDS);
// add to the key map
Expand All @@ -139,9 +136,8 @@ public String addScheduledJobAfterDelay(int interval, IScheduledJob job, int del
Map<String, Object> jobData = new HashMap<>();
jobData.put(ISchedulingService.SCHEDULING_SERVICE, this);
jobData.put(ISchedulingService.SCHEDULED_JOB, job);
// runnable task
JDKSchedulingServiceJob schedJob = new JDKSchedulingServiceJob();
schedJob.setJobDataMap(jobData);
// runnable task with false to prevent removal after it runs, since this job is meant to iterate
JDKSchedulingServiceJob schedJob = new JDKSchedulingServiceJob(name, jobData, false);
// schedule it to run at interval
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(schedJob, delay, interval, TimeUnit.MILLISECONDS);
// add to the key map
Expand Down Expand Up @@ -182,10 +178,11 @@ public void resumeScheduledJob(String name) {
public void removeScheduledJob(String name) {
try {
ScheduledFuture<?> future = keyMap.remove(name);
if (future != null) {
// check done before we attempt cancelling
if (future != null && !future.isDone()) {
future.cancel(interruptOnRemove);
} else {
log.debug("No key found for job: {}", name);
log.debug("No key found for job: {} or the job was done", name);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,37 @@
*/
public class JDKSchedulingServiceJob implements Runnable {

private Logger log = LoggerFactory.getLogger(JDKSchedulingServiceJob.class);
private final static Logger log = LoggerFactory.getLogger(JDKSchedulingServiceJob.class);

/**
* Job data map
*/
private Map<String, Object> jobDataMap;
private final Map<String, Object> jobDataMap;

public void setJobDataMap(Map<String, Object> jobDataMap) {
private final String jobName;

// set this flag to prevent removal within the internal run() of the scheduled job
private final boolean autoRemove;

public JDKSchedulingServiceJob(String name, Map<String, Object> dataMap) {
this.jobDataMap = dataMap;
log.debug("Set job data map: {}", jobDataMap);
this.jobName = name;
this.autoRemove = true;
}

public JDKSchedulingServiceJob(String name, Map<String, Object> dataMap, boolean autoRemove) {
this.jobDataMap = dataMap;
log.debug("Set job data map: {}", jobDataMap);
this.jobDataMap = jobDataMap;
this.jobName = name;
this.autoRemove = autoRemove;
}

public void run() {
//log.debug("execute");
ISchedulingService service = (ISchedulingService) jobDataMap.get(ISchedulingService.SCHEDULING_SERVICE);
IScheduledJob job = null;
try {
ISchedulingService service = (ISchedulingService) jobDataMap.get(ISchedulingService.SCHEDULING_SERVICE);
job = (IScheduledJob) jobDataMap.get(ISchedulingService.SCHEDULED_JOB);
job.execute(service);
} catch (Throwable e) {
Expand All @@ -47,6 +61,13 @@ public void run() {
} else {
log.warn("Job {} execution failed", job.toString(), e);
}
} finally {
// remove the job
if (autoRemove) {
service.removeScheduledJob(jobName);
}
// clear the map
jobDataMap.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,14 @@ public boolean addWebSocketScope(WebSocketScope webSocketScope) {
try {
wsConn.sendPing(PING_BYTES);
} catch (Exception e) {
log.debug("Exception pinging connection: {} connection will be closed", wsConn.getSessionId(), e);
// if the ping fails, consider them gone
wsConn.close();
}
} else {
log.debug("Removing unconnected connection: {} during ping loop", wsConn.getSessionId());
// if the connection isn't connected, remove them
wsScope.removeConnection(wsConn);
}
});
log.trace("finished pinging scope: {}", sName);
Expand Down

0 comments on commit 47d62bf

Please sign in to comment.