Skip to content

Commit

Permalink
Bug fixing - deadlock fixed.
Browse files Browse the repository at this point in the history
Version 0.2.0
* Deadlock fixed.
* Labels - correcting labels to be compliant with Grafana Loki.
* LogController - possibility to send logs before stop.
* LogController - possibility to interrupt worker thread.
* LogSenderSettings - possibility to set connection timeout.
* LogMonitor - on worker thread exit event added.
* Documentation updated.
  • Loading branch information
mj committed Nov 4, 2021
1 parent b6bf45c commit 98a4548
Show file tree
Hide file tree
Showing 17 changed files with 775 additions and 190 deletions.
32 changes: 20 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,35 @@ import pl.mjaron.tinyloki.*;

public class Sample {
public static void main(String[] args) {

// Initialize log controller instance with URL.
// Usually more than one instance in application doesn't make sense.
// Give Basic Authentication credentials or nulls.
// LogController owns separate thread which sends logs periodically.
LogController logController = TinyLoki.createAndStart(
"https://localhost/loki/api/v1/push", "user", "pass");
"https://localhost/loki/api/v1/push", "user", "pass");

// Create streams. It is thread-safe.
ILogStream stream = logController.createStream(
// Define stream labels...
TinyLoki.l(Labels.LEVEL, Labels.INFO)
.l("host", "MyComputerName")
.l("custom-label", "custom-value")
// Define stream labels...
TinyLoki.l(Labels.LEVEL, Labels.INFO)
.l("host", "MyComputerName")
.l("customLabel", "custom_value")
// Label names should start with letter
// and contain letters, digits and '_' only.
// Bad characters will be replaced by '_'.
// If first character is bad, it will be replaced by 'A'.
);

// ... new streams and other logs here (thread-safe).
stream.log("Hello world.");

// Optionally flush logs before application exit.
logController.softStop().waitForStop();
logController.softStop().hardStop();
}
}
```

## Integration

### Maven Central
Expand All @@ -52,19 +57,22 @@ public class Sample {

```gradle
dependencies {
implementation 'io.github.mjfryc:mjaron-tinyloki-java:0.1.22'
implementation 'io.github.mjfryc:mjaron-tinyloki-java:0.2.0'
}
```

### GitHub Packages

Click the [Packages section](https://github.com/mjfryc?tab=packages&repo_name=mjaron-tinyloki-java) on the right.

### Download directly

1. Click the [Packages section](https://github.com/mjfryc?tab=packages&repo_name=mjaron-tinyloki-java) on the right.
2. Find and download jar package from files list to e.g. `your_project_root/libs` dir.
3. Add this jar to project dependencies in build.gradle, e.g:

```gradle
dependencies {
implementation files(project.rootDir.absolutePath + '/libs/mjaron-tinyloki-java-0.1.22.jar')
implementation files(project.rootDir.absolutePath + '/libs/mjaron-tinyloki-java-0.2.0.jar')
}
```
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group 'io.github.mjfryc'
version '0.1.22'
version '0.2.0'

repositories {
mavenCentral()
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/pl/mjaron/tinyloki/ErrorLogMonitor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package pl.mjaron.tinyloki;

/**
* Prints only error messages.
*/
public class ErrorLogMonitor implements ILogMonitor {
@Override
public void send(final byte[] message) {
Expand All @@ -18,4 +21,13 @@ public void sendErr(final int status, final String message) {
public void onException(Exception exception) {
exception.printStackTrace();
}

@Override
public void onWorkerThreadExit(final boolean isSoft) {
if (isSoft) {
System.out.println("Worker thread exited correctly.");
} else {
System.err.println("Worker thread exited by interrupting.");
}
}
}
14 changes: 9 additions & 5 deletions src/main/java/pl/mjaron/tinyloki/ILogCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,32 @@ public interface ILogCollector {

/**
* Creates a new stream.
*
* @param labels Unique set of labels.
* @return New stream instance.
*/
ILogStream createStream(final Map<String, String> labels);

/**
* Gets data from streams and clears streams state.
* @return Encoded content of streams.
*
* @return Encoded content of streams or null if there is no new logs to send.
*/
byte[] collect();

/**
* HTTP content type describing data type of collect() result.
* @return
* HTTP Content-Type describing data type of collect() result.
*
* @return HTTP Content-Type header value.
*/
String contentType();

/**
* Stop thread until a new log will occur.
*
* @param timeout Time in milliseconds.
* @return True if any logs has occurred in given time.
* @return Count of logs in given time. It may not be exact count of logs and depends on implementation.
* @throws InterruptedException When given thread has been interrupted.
*/
boolean waitForLogs(final long timeout) throws InterruptedException;
int waitForLogs(final long timeout) throws InterruptedException;
}
23 changes: 22 additions & 1 deletion src/main/java/pl/mjaron/tinyloki/ILogMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,39 @@
*/
public interface ILogMonitor {

/**
* Called before sending given data to HTTP server.
*
* @param message Data reference.
*/
void send(final byte[] message);

/**
* Called on HTTP server response with good status.
*
* @param status HTTP status.
*/
void sendOk(final int status);

/**
* Handle send HTTP response error.
*
* @param status HTTP status code.
* @param message HTTP status message.
*/
void sendErr(final int status, final String message);

/**
* Called on any exception.
* @param exception
*
* @param exception Exception reference.
*/
void onException(final Exception exception);

/**
* Called when worker thread exits.
*
* @param isSoft Tells whether worker thread has exited without interrupting.
*/
void onWorkerThreadExit(final boolean isSoft);
}
5 changes: 3 additions & 2 deletions src/main/java/pl/mjaron/tinyloki/ILogStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public interface ILogStream {
void log(final long timestampMs, final String line);

/**
* Log line with current time.
* Thread-safe log line with current time.
*
* @param line Log content.
*/
default void log(final String line) {
Expand All @@ -23,7 +24,7 @@ default void log(final String line) {

/**
* Release log stream, so it isn't longer managed by its log collector.
* It is not mandatory to call if log lifetime is the same as application lifetime.
* It is not mandatory to call if this stream lifetime is the same as application lifetime.
*/
void release();
}
97 changes: 79 additions & 18 deletions src/main/java/pl/mjaron/tinyloki/JsonLogCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,122 @@
import java.util.List;
import java.util.Map;

/**
* Collects logs in a JSON format consistent with
* <a href="https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push">Loki Push API</a>.
*/
public class JsonLogCollector implements ILogCollector {

private final List<JsonLogStream> streams = new ArrayList<>();
private boolean logOccurred = false;
private int logEntriesCount = 0;
private final Object logEntriesLock = new Object();

/**
* Creates new instance of stream which will notify this collector about new logs.
* This collector will flush logs from the stream.
*
* @param labels Unique set of labels.
* @return New instance of a stream.
*/
@Override
synchronized public ILogStream createStream(Map<String, String> labels) {
JsonLogStream stream = new JsonLogStream(this, labels);
streams.add(stream);
return stream;
}

/**
* Given stream will not be flushed by this log collector anymore, so given stream will accumulate
* all next logs causing memory leaks (if it will not be garbage collected).
* Called by {@link JsonLogStream#release()}, so there is no need to call it directly.
*
* @param stream Stream to release.
*/
synchronized public void onStreamReleased(ILogStream stream) {
streams.remove((JsonLogStream) stream);
}

/**
* Create complete stream chunk in JSON string, stored as UTF-8 bytes.
*
* @return JSON bytes containing stream chunk.
*/
@Override
public byte[] collect() {
return collectAsString().getBytes(StandardCharsets.UTF_8);
final String collectedAsString = collectAsString();
if (collectedAsString == null) {
return null;
}
return collectedAsString.getBytes(StandardCharsets.UTF_8);
}

/**
* Create complete stream chunk in JSON string.
*
* @return JSON string containing stream chunk.
*/
synchronized public String collectAsString() {
final StringBuilder b = new StringBuilder("{\"streams\":[");
boolean isFirst = true;
boolean anyStreamNotEmpty = false;
for (final JsonLogStream stream : streams) {
if (isFirst) {
isFirst = false;
} else {
b.append(',');
final String streamData = stream.flush();
if (streamData != null) {
if (isFirst) {
isFirst = false;
} else {
b.append(',');
}
b.append(streamData);
anyStreamNotEmpty = true;
}
b.append(stream.flush());
}

if (!anyStreamNotEmpty) {
return null;
}

b.append("]}");
return b.toString();
}

/**
* Used in HTTP Content-Type header. Grafana Loki will interpret content as JSON.
*
* @return String complaint with HTTP Content-Type header, telling that content is a JSON data.
*/
@Override
public String contentType() {
return "application/json";
}

synchronized void logOccurred() {
logOccurred = true;
notify();
/**
* Notify log collector that any log has occurred.
* Thread safe.
*/
void logOccurred() {
synchronized (logEntriesLock) {
++logEntriesCount;
logEntriesLock.notify();
}
}

/**
* Blocking function. Waits until at least one log from any stream occurs.
*
* @param timeout Time in milliseconds.
* @return Collected logs count.
* @throws InterruptedException When this thread is interrupted during waiting.
*/
@Override
public synchronized boolean waitForLogs(final long timeout) throws InterruptedException {
if (!logOccurred) {
this.wait(timeout);
}
if (logOccurred) {
logOccurred = false;
return true;
public int waitForLogs(final long timeout) throws InterruptedException {
synchronized (logEntriesLock) {
if (logEntriesCount == 0) {
logEntriesLock.wait(timeout);
}
int logEntriesCountCopy = logEntriesCount;
logEntriesCount = 0;
return logEntriesCountCopy;
}
return false;
}
}
Loading

0 comments on commit 98a4548

Please sign in to comment.