-
Notifications
You must be signed in to change notification settings - Fork 20
Plugins
Musoq uses conception of schemas and virtual tables. When typing ... from #csv.file('path/to/file.csv')
you are refering to schema named csv
and parametrized source based on which the virtual table will be created. What steps are required to create own source:
- Implement
ISchemaTable
. It defines what columns table has. - Implement queryable source by inheriting from
RowSource
. - Implement user defined functions by inheriting from
LibraryBase
. - Implement schema by inheriting from
SchemaBase
. - Copy .dlls to appropriate directory.
- Edit service .app config file.
All those tasks are fairly easy and we will implement very easy log source to make sure everything is clear and explained in details.
Let's start with something really easy. Simple flat file reader. It will treat file as queryable source, read it's lines one by one and do something usefull with it. Our table will consist of two columns, LineNumber
and Line
. The first column is just pointer to line in file. The second one is line from that file.
After that short introduction, let's do some implementation. First one is ISchemaTable
. This is interface that defines what columns your source has. Based on that, evaluator will be able to access proper column from the source, will know it's name, type and index. Ok, so before we start implement table, we will need to create some helper classes. Look at it below:
public static class FlatFileHelper
{
public static readonly IDictionary<string, int> FlatNameToIndexMap;
public static readonly IDictionary<int, Func<FlatFileEntity, object>> FlatIndexToMethodAccessMap;
public static readonly ISchemaColumn[] FlatColumns;
static FlatFileHelper()
{
FlatNameToIndexMap = new Dictionary<string, int>
{
{nameof(FlatFileEntity.LineNumber), 0},
{nameof(FlatFileEntity.Line), 1}
};
FlatIndexToMethodAccessMap = new Dictionary<int, Func<FlatFileEntity, object>>
{
{0, info => info.LineNumber},
{1, info => info.Line}
};
FlatColumns = new ISchemaColumn[]
{
new SchemaColumn(nameof(FlatFileEntity.LineNumber), 0, typeof(int)),
new SchemaColumn(nameof(FlatFileEntity.Line), 1, typeof(string))
};
}
}
What this class do are three things:
- Defining columns in variable
FlatColumns
. As you can see, it just instantiate array of columns, their name, indexes and type of stored values. - Define
FlatNameToIndexMap
that remap your column name to specifix index. - Define
FlatIndexToMethodAccessMap
that points how to access specific property from the source.
There is one thing that is not mentioned here. what is FlatFileEntity
in public static readonly IDictionary<int, Func<FlatFileEntity, object>> FlatIndexToMethodAccessMap
. Answer is, the FlatFileEntity
type holds single row from source. In our case, it will contain a single line from file and line number. After we have this helper class, We can move forward and implement proper table.
The easiest one implementation is really short and doesn't need explanation. It just assigns columns from helper class to our table instance.
public class FlatFileTable : ISchemaTable
{
public ISchemaColumn[] Columns { get; } = FlatFileHelper.FlatColumns;
}
As you see, there is no any file accessing here, so where do We access our file? The answer is not here. This class just define metadata about your source. We will implement it for a little bit below.
Row source for your queries are the most robust code you will need to type and as you will see, it's complexity will depends mostly from how hard is to access row sources for your use case. In case of this tutorial, it won't be hard. Basic approach proposed by this framework is to use chunked source load. Chunked load is based on loading file in parts and process only those loaded parts, not whole source so the processor doesn't have to wait for the file to be loaded completely.
To implement flat file source, we will use RowSourceBase<FlatFileEntity>
that has already implemented whole logic for loading sources and is able to pass those chunks to evaluator immediately. The only what we will need to implement is reading file line by line. To do this, we have override method CollectChunks
. Let's see how the template looks like:
public class FlatFileSource : RowSourceBase<FlatFileEntity>
{
private readonly string _filePath;
public FlatFileSource(string filePath)
{
_filePath = filePath;
}
protected override void CollectChunks(BlockingCollection<IReadOnlyList<EntityResolver<FlatFileEntity>>> chunkedSource)
{
.... implementation here.
}
}
The only parameter that is required in this case is to have path to file. Ok, let's implement file reading then. The first line we will type is:
const int chunkSize = 999;
which defines how big can be single chunk. If the file has 1050 lines, there will be two chunks, first with 1000 rows and the second one with 50 rows. Going further, we have to check if the file exists so:
if(!File.Exists(_filePath)) return;
If file doesn't exists, I will just consider it as empty file so no rows included.
var rowNum = 0;
using (var file = File.OpenRead(_filePath))
{
using (var reader = new StreamReader(file))
{
var list = new List<EntityResolver<FlatFileEntity>>();
while (!reader.EndOfStream)
{
var line = reader.ReadLine();
var entity = new FlatFileEntity()
{
Line = line,
LineNumber = ++rowNum
};
list.Add(new EntityResolver<FlatFileEntity>(entity, FlatFileHelper.FlatNameToIndexMap, FlatFileHelper.FlatIndexToMethodAccessMap));
if (rowNum <= chunkSize)
continue;
rowNum = 0;
chunkedSource.Add(list);
list = new List<EntityResolver<FlatFileEntity>>(chunkSize);
}
chunkedSource.Add(list);
}
}
Further code opens file in read mode and start reading it line by line until rowNum
reach appropiate value. If the value exceed it, chunk is ready to be passed to evaluator which is done by chunkedSource.Add(list)
. New chunk will be used for the next values.
A little explanation is required within the line where new EntityResolver<FlatFileEntity>(entity, FlatFileHelper.FlatNameToIndexMap, FlatFileHelper.FlatIndexToMethodAccessMap)
code reside. We defined earlier in the helper class how to access column in our source. This is the place where we need to use it. The EntityResolver<T>
will use this information every time the evaluator will try access some specific column. Make sure your types are identical as in source, it's enforced to use the same type as in helper class. If something will be mistyped, the exception will be thrown.