diff --git a/python/micromegas/micromegas/flightsql/client.py b/python/micromegas/micromegas/flightsql/client.py index 0c57a53..225854d 100644 --- a/python/micromegas/micromegas/flightsql/client.py +++ b/python/micromegas/micromegas/flightsql/client.py @@ -253,3 +253,56 @@ def materialize_partitions( for rb in self.query_stream(sql): for index, row in rb.to_pandas().iterrows(): print(row["time"], row["msg"]) + + def find_process(self, process_id): + sql = """ + SELECT * + FROM processes + WHERE process_id='{process_id}'; + """.format( + process_id=process_id + ) + return self.query(sql) + + def query_streams(self, begin, end, limit, process_id=None, tag_filter=None): + conditions = [] + if process_id is not None: + conditions.append("process_id='{process_id}'".format(process_id=process_id)) + if tag_filter is not None: + conditions.append( + "(array_position(tags, '{tag}') is not NULL)".format(tag=tag_filter) + ) + where = "" + if len(conditions) > 0: + where = "WHERE " + " AND ".join(conditions) + sql = """ + SELECT * + FROM streams + {where} + LIMIT {limit}; + """.format( + where=where, limit=limit + ) + return self.query(sql, begin, end) + + def query_blocks(self, begin, end, limit, stream_id): + sql = """ + SELECT * + FROM blocks + WHERE stream_id='{stream_id}' + LIMIT {limit}; + """.format( + limit=limit,stream_id=stream_id + ) + return self.query(sql, begin, end) + + def query_spans(self, begin, end, limit, stream_id): + sql = """ + SELECT * + FROM view_instance('thread_spans', '{stream_id}') + LIMIT {limit}; + """.format( + limit=limit,stream_id=stream_id + ) + return self.query(sql, begin, end) +