Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TCP client to detect connection closures #108

Closed
wants to merge 5 commits into from

Conversation

markkuhn
Copy link
Contributor

@markkuhn markkuhn commented Aug 3, 2022

Issue #, if available: #99

Description of changes:
Added an additional empty os.write() to sendMessage() to test socket connection.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@markkuhn markkuhn changed the title Add socket EOS detection Add retry queue for dropped tcp packets Aug 3, 2022
@brandondahler
Copy link

brandondahler commented Aug 4, 2022

Executing the reproduction steps from #99, it doesn't look like it resolves the problem specifically with half-open sockets:

$ nc -l 127.0.0.1 25888 -k
{"_aws":{"Timestamp":1659633805417,"CloudWatchMetrics":[{"Namespace":"aws-embedded-metrics","Metrics":[{"Name":"Test","Unit":"None"}],"Dimensions":[["LogGroup","ServiceName","ServiceType"]]}],"LogGroupName":"Unknown-metrics"},"LogGroup":"Unknown-metrics","ServiceName":"Unknown","Test":1.0,"ServiceType":"Unknown"}
^C
$ nc -l 127.0.0.1 25888 -k
<no output>

Updating the reproducing code to have an extra log message also doesn't seem to work quite as expected -- it appears to duplicate the second message on retry:

final Configuration config = EnvironmentConfigurationProvider.getConfig();
config.setEnvironmentOverride(Environments.Agent);

final Environment environment = new EnvironmentProvider()
    .resolveEnvironment()
    .get();

final MetricsLogger metricsLogger = new MetricsLogger(environment);
metricsLogger.putMetric("Test", 1.0d);
metricsLogger.flush();

System.out.println("Sleeping, restart agent socket now...");
Thread.sleep(10_000);

final MetricsLogger metricsLogger2 = new MetricsLogger(environment);
metricsLogger2.putMetric("Test2", 1.0d);
metricsLogger2.flush();

final MetricsLogger metricsLogger3 = new MetricsLogger(environment);
metricsLogger3.putMetric("Test3", 1.0d);
metricsLogger3.flush();

environment.getSink()
    .shutdown()
    .join();

Output:

$ nc -l 127.0.0.1 25888 -k
{"_aws":{"Timestamp":1659633935242,"CloudWatchMetrics":[{"Namespace":"aws-embedded-metrics","Metrics":[{"Name":"Test","Unit":"None"}],"Dimensions":[["LogGroup","ServiceName","ServiceType"]]}],"LogGroupName":"Unknown-metrics"},"LogGroup":"Unknown-metrics","ServiceName":"Unknown","Test":1.0,"ServiceType":"Unknown"}
^C
$ nc -l 127.0.0.1 25888 -k
{"_aws":{"Timestamp":1659633945311,"CloudWatchMetrics":[{"Namespace":"aws-embedded-metrics","Metrics":[{"Name":"Test2","Unit":"None"}],"Dimensions":[["LogGroup","ServiceName","ServiceType"]]}],"LogGroupName":"Unknown-metrics"},"LogGroup":"Unknown-metrics","ServiceName":"Unknown","ServiceType":"Unknown","Test2":1.0}
{"_aws":{"Timestamp":1659633945311,"CloudWatchMetrics":[{"Namespace":"aws-embedded-metrics","Metrics":[{"Name":"Test3","Unit":"None"}],"Dimensions":[["LogGroup","ServiceName","ServiceType"]]}],"LogGroupName":"Unknown-metrics"},"LogGroup":"Unknown-metrics","ServiceName":"Unknown","ServiceType":"Unknown","Test3":1.0}
{"_aws":{"Timestamp":1659633945311,"CloudWatchMetrics":[{"Namespace":"aws-embedded-metrics","Metrics":[{"Name":"Test3","Unit":"None"}],"Dimensions":[["LogGroup","ServiceName","ServiceType"]]}],"LogGroupName":"Unknown-metrics"},"LogGroup":"Unknown-metrics","ServiceName":"Unknown","ServiceType":"Unknown","Test3":1.0}

@markkuhn
Copy link
Contributor Author

markkuhn commented Aug 4, 2022

The retryQueue was interfering with sendMessageForMaxAttempts(). I have removed it and replaced it with an extra os.write(). This will write 1 byte to the stream as a way to verify the connection. If it fails, the sendMessageForMaxAttempts will take over and retry. If it passes then os.write(message.getBytes()) will proceed.

@markkuhn markkuhn changed the title Add retry queue for dropped tcp packets Fix TCP client to detect connection closures Aug 4, 2022
@markkuhn markkuhn requested review from evanuk and removed request for giancarlokc August 8, 2022 17:38
@@ -65,13 +63,22 @@ public synchronized void sendMessage(String message) {
}

try {
// Write a space to the socket to verify connection before sending event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment should include why we need this, rather what.

@brandondahler
Copy link

I pulled the latest changes and attempted to run the reproduction code again (see previous comment).

My results were as follows:

  1. Two messages with 10s delay between to allow CTRL+C and re-start:
$ nc -l 127.0.0.1 25888 -k
 {"_aws":{"Timestamp":1660165747821,"CloudWatchMetrics":[{"Namespace":"aws-embedded-metrics","Metrics":[{"Name":"Test","Unit":"None"}],"Dimensions":[["LogGroup","ServiceName","ServiceType"]]}],"LogGroupName":"Unknown-metrics"},"LogGroup":"Unknown-metrics","ServiceName":"Unknown","Test":1.0,"ServiceType":"Unknown"}
^C
$ nc -l 127.0.0.1 25888 -k
<no output>
  1. Three messages, 10s delay between 1st and 2nd to allow CTRL+C and re-start:
$ nc -l 127.0.0.1 25888 -k
 {"_aws":{"Timestamp":1660165887524,"CloudWatchMetrics":[{"Namespace":"aws-embedded-metrics","Metrics":[{"Name":"Test","Unit":"None"}],"Dimensions":[["LogGroup","ServiceName","ServiceType"]]}],"LogGroupName":"Unknown-metrics"},"LogGroup":"Unknown-metrics","ServiceName":"Unknown","Test":1.0,"ServiceType":"Unknown"}
^C
$ nc -l 127.0.0.1 25888 -k
 {"_aws":{"Timestamp":1660165897592,"CloudWatchMetrics":[{"Namespace":"aws-embedded-metrics","Metrics":[{"Name":"Test3","Unit":"None"}],"Dimensions":[["LogGroup","ServiceName","ServiceType"]]}],"LogGroupName":"Unknown-metrics"},"LogGroup":"Unknown-metrics","ServiceName":"Unknown","ServiceType":"Unknown","Test3":1.0}

Neither of these tests resulted in the expected behavior.

@markkuhn
Copy link
Contributor Author

markkuhn commented Aug 12, 2022

@brandondahler I ran tests again and I see now that in about 5% of the cases, the socket doesn't detect its closure in time and the message gets lost.
I would like to use your solution but after running some tests I can see that it does sometimes miss a message. At times it also sends a message twice which isn't something we can have. I'm going to do some digging to see if there's another way to approach this.

@markkuhn markkuhn added the help wanted Extra attention is needed label Aug 19, 2022
@markkuhn markkuhn self-assigned this Aug 23, 2022
@markkuhn markkuhn removed the request for review from giancarlokc September 8, 2022 16:49
@markkuhn markkuhn closed this Nov 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants