You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm trying to implement a very simple rest API with just one endpoint using shelf initializing multiple instances and I want to be able to integrate with Prometheus and Gratana to display a graph of requests per second and/or total requests per day, I made this implementation using the stream_isolate package and prometheus_client , I would like to know if there is a simpler and cleaner way to do this
import'dart:async';
import'dart:convert';
import'dart:isolate';
import'package:eloquent/eloquent.dart';
import'package:shelf/shelf.dart';
import'package:shelf_router/shelf_router.dart';
import'package:shelf/shelf_io.dart'as io;
import'package:stack_trace/stack_trace.dart';
import'package:new_sali_backend/src/db/db_layer.dart';
import'package:new_sali_backend/src/modules/protocolo/repositories/processo_repository.dart';
import'package:new_sali_core/src/utils/core_utils.dart';
import'package:new_sali_core/src/models/status_message.dart';
import'package:prometheus_client/prometheus_client.dart';
import'package:prometheus_client/runtime_metrics.dart'as runtime_metrics;
import'package:prometheus_client_shelf/shelf_metrics.dart'as shelf_metrics;
import'package:prometheus_client/format.dart'as format;
import'package:args/args.dart'show ArgParser;
import'shelf_cors_headers_base.dart';
import'stream_isolate.dart';
// to test// xargs -I % -P 8 curl "http:/192.168.66.123:3161/api/v1/protocolo/processos/public/site/2023/10" < <(printf '%s\n' {1..400})const defaultHeaders = {'Content-Type':'application/json;charset=utf-8'};
ResponseresponseError(String message,
{dynamic exception, dynamic stackTrace, int statusCode =400}) {
final v =jsonEncode({
'is_error':true,
'status_code': statusCode,
'message': message,
'exception': exception?.toString(),
'stackTrace': stackTrace?.toString()
});
returnResponse(statusCode, body: v, headers: defaultHeaders);
}
final basePath ='/api/v1';
final streamIsolates =<Map<int, BidirectionalStreamIsolate>>[];
voidmain(List<String> args) async {
final parser =newArgParser()
..addOption('address', abbr:'a', defaultsTo:'0.0.0.0')
..addOption('port', abbr:'p', defaultsTo:'3161')
..addOption('isolates', abbr:'i', defaultsTo:'3');
final argsParsed = parser.parse(args);
final arguments = [argsParsed['address'], int.parse(argsParsed['port'])];
final numberOfIsolates =int.parse(argsParsed['isolates']);
for (var i =0; i < numberOfIsolates -1; i++) {
final streamIsolate =awaitStreamIsolate.spawnBidirectional(isolateMain,
debugName: i.toString(), argument: [i, ...arguments]);
streamIsolates.add({i: streamIsolate});
streamIsolate.stream.listen((event) =>receiveAndPass(event, i));
}
}
/// receive msg from isolate and send to all isolatesvoidreceiveAndPass(event, int idx) {
streamIsolates.forEach((item) {
item.values.first.send(event);
});
}
StreamisolateMain(Stream inc, dynamic args) {
final arguments = args asList;
int id = arguments[0];
String address = arguments[1];
int port = arguments[2];
final streamController =StreamController.broadcast();
final reg =CollectorRegistry(); //CollectorRegistry.defaultRegistry;// Register default runtime metrics
runtime_metrics.register(reg);
// Register http requests totalfinal http_requests_total =Counter(
name:'http_requests_total', help:'Total number of http api requests');
http_requests_total.register(reg);
// listen msg from main
inc.listen((msg) {
http_requests_total.inc();
});
_startServer([id, streamController, reg, address, port]);
return streamController.stream;
}
void_startServer(List args) async {
final streamController = args[1] asStreamController;
final reg = args[2] asCollectorRegistry;
String address = args[3];
int port = args[4];
final app =Router();
routes(app, reg);
final handler =Pipeline()
.addMiddleware(corsHeaders())
.addMiddleware(shelf_metrics.register(reg))
.addMiddleware((innerHandler) {
return (request) async {
// Every time http_request is called, increase the counter by onefinal resp =awaitinnerHandler(request);
if (!request.url.path.contains('metrics')) {
//send msg to main
streamController.add('+1');
}
return resp;
};
})
.addMiddleware(logRequestsCustom())
.addHandler(app);
final server =await io.serve(handler, address, port, shared:true);
server.defaultResponseHeaders.remove('X-Frame-Options', 'SAMEORIGIN');
print('Serving at http://${server.address.host}:${server.port}');
}
voidroutes(Router app, CollectorRegistry reg) {
// Register a handler to expose the metrics in the Prometheus text format
app.get('/metrics', (Request request) async {
final buffer =StringBuffer();
final metrics =await reg.collectMetricFamilySamples();
format.write004(buffer, metrics);
returnResponse.ok(
buffer.toString(),
headers: {'Content-Type': format.contentType},
);
});
app.get('$basePath/protocolo/processos/public/site/<ano>/<codigo>',
(Request request, String ano, String codigo) async {
//final key = request.headers['Authorization'];Connection? conn;
try {
final codProcesso =int.tryParse(codigo);
if (codProcesso ==null) {
returnresponseError('codProcesso invalido');
}
final anoExercicio = ano;
conn =awaitDBLayer().connect();
final procRepo =ProcessoRepository(conn);
final proc =await procRepo.getProcessoByCodigoPublic(codProcesso, anoExercicio);
await conn.disconnect();
returnResponse.ok(
jsonEncode(proc, toEncodable:SaliCoreUtils.customJsonEncode),
headers: defaultHeaders,
);
} catch (e, s) {
await conn?.disconnect();
print('public_backend@getProcessoByCodigoPublic $e $s');
returnresponseError(StatusMessage.ERROR_GENERIC);
}
});
}
MiddlewarelogRequestsCustom(
{voidFunction(String message, bool isError)? logger}) =>
(innerHandler) {
final theLogger = logger ?? _defaultLogger;
return (request) {
var startTime =DateTime.now();
var watch =Stopwatch()..start();
returnFuture.sync(() =>innerHandler(request)).then((response) {
var msg =_message(startTime, response.statusCode,
request.requestedUri, request.method, watch.elapsed);
theLogger(msg, false);
return response;
}, onError: (Object error, StackTrace stackTrace) {
if (error isHijackException) throw error;
var msg =_errorMessage(startTime, request.requestedUri,
request.method, watch.elapsed, error, stackTrace);
theLogger(msg, true);
// ignore: only_throw_errorsthrow error;
});
};
};
String_formatQuery(String query) {
return query ==''?'':'?$query';
}
String_message(DateTime requestTime, int statusCode, Uri requestedUri,
String method, Duration elapsedTime) {
return'${requestTime.toIso8601String()} ''${elapsedTime.toString().padLeft(15)} ''${method.padRight(7)} [$statusCode] '// 7 - longest standard HTTP method'${requestedUri.path}${_formatQuery(requestedUri.query)}'' isolate: ${Isolate.current.debugName}';
}
String_errorMessage(DateTime requestTime, Uri requestedUri, String method,
Duration elapsedTime, Object error, StackTrace? stack) {
var chain =Chain.current();
if (stack !=null) {
chain =Chain.forTrace(stack)
.foldFrames((frame) => frame.isCore || frame.package =='shelf')
.terse;
}
var msg ='$requestTime\t$elapsedTime\t$method\t${requestedUri.path}''${_formatQuery(requestedUri.query)}\n$error';
return'$msg\n$chain';
}
void_defaultLogger(String msg, bool isError) {
if (isError) {
print('[ERROR] $msg');
} else {
print(msg);
}
}
The text was updated successfully, but these errors were encountered:
It would be interesting if there was also a simpler and cleaner way to initialize a multi-instance Shelf application, just like there is in the Angel3 Framework
I'm trying to implement a very simple rest API with just one endpoint using shelf initializing multiple instances and I want to be able to integrate with Prometheus and Gratana to display a graph of requests per second and/or total requests per day, I made this implementation using the stream_isolate package and prometheus_client , I would like to know if there is a simpler and cleaner way to do this
The text was updated successfully, but these errors were encountered: