-
Notifications
You must be signed in to change notification settings - Fork 20
Plugins
Musoq uses conception of schemas and virtual tables. When typing ... from #csv.file('path/to/file.csv')
you are refering to schema named csv
and parametrized source based on which the virtual table will be created. What steps are required to create own source:
- Implement
ISchemaTable
. It defines what columns table has. - Implement queryable source by inheriting from
RowSource
. - Implement user defined functions by inheriting from
LibraryBase
. - Implement schema by inheriting from
SchemaBase
. - Copy .dlls to appropriate directory.
- Edit service .app config file.
All those tasks are fairly easy and we will implement very easy log source to make sure everything is clear and explained in details.
Let's start with something really easy. Simple flat file reader. It will treat file as queryable source, read it's lines one by one and do something usefull with it. Our table will consist of two columns, LineNumber
and Line
. The first column is just pointer to line in file. The second one is line from that file.
After that short introduction, let's do some implementation. First one is ISchemaTable
. This is interface that defines what columns your source has. Based on that, evaluator will be able to access proper column from the source, will know it's name, type and index. Ok, so before we start implement table, we will need to create some helper classes. Look at it below:
public static class FlatFileHelper
{
public static readonly IDictionary<string, int> FlatNameToIndexMap;
public static readonly IDictionary<int, Func<FlatFileEntity, object>> FlatIndexToMethodAccessMap;
public static readonly ISchemaColumn[] FlatColumns;
static FlatFileHelper()
{
FlatNameToIndexMap = new Dictionary<string, int>
{
{nameof(FlatFileEntity.LineNumber), 0},
{nameof(FlatFileEntity.Line), 1}
};
FlatIndexToMethodAccessMap = new Dictionary<int, Func<FlatFileEntity, object>>
{
{0, info => info.LineNumber},
{1, info => info.Line}
};
FlatColumns = new ISchemaColumn[]
{
new SchemaColumn(nameof(FlatFileEntity.LineNumber), 0, typeof(int)),
new SchemaColumn(nameof(FlatFileEntity.Line), 1, typeof(string))
};
}
}
What this class do are three things:
- Defining columns in variable
FlatColumns
. As you can see, it just instantiate array of columns, their name, indexes and type of stored values. - Define
FlatNameToIndexMap
that remap your column name to specifix index. - Define
FlatIndexToMethodAccessMap
that points how to access specific property from the source.
There is one thing that is not mentioned here. what is FlatFileEntity
in public static readonly IDictionary<int, Func<FlatFileEntity, object>> FlatIndexToMethodAccessMap
. Answer is, the FlatFileEntity
type holds single row from source. In our case, it will contain a single line from file and line number. After we have this helper class, We can move forward and implement proper table.
The easiest one implementation is really short and doesn't need explanation. It just assigns columns from helper class to our table instance.
public class FlatFileTable : ISchemaTable
{
public ISchemaColumn[] Columns { get; } = FlatFileHelper.FlatColumns;
}
As you see, there is no any file accessing here, so where do We access our file? The answer is not here. This class just define metadata about your source. We will implement it for a little bit below.
Row source for your queries are the most robust code you will need to type and as you will see, it's complexity will depends mostly from how hard is to access row sources for your use case. In case of this tutorial, it won't be hard. Basic approach proposed by this framework is to use chunked source load. Chunked load is based on loading file in parts and process only those loaded parts, not whole source so the processor doesn't have to wait for the file to be loaded completely.
To implement flat file source, we will use RowSourceBase<FlatFileEntity>
that has already implemented whole logic for loading sources and is able to pass those chunks to evaluator immediately. The only what we will need to implement is reading file line by line. To do this, we have override method CollectChunks
. Let's see how the template looks like:
public class FlatFileSource : RowSourceBase<FlatFileEntity>
{
private readonly string _filePath;
public FlatFileSource(string filePath)
{
_filePath = filePath;
}
protected override void CollectChunks(BlockingCollection<IReadOnlyList<EntityResolver<FlatFileEntity>>> chunkedSource)
{
.... implementation here.
}
}
The only parameter that is required in this case is to have path to file. Ok, let's implement file reading then. The first line we will type is:
const int chunkSize = 999;
which defines how big can be single chunk. If the file has 1050 lines, there will be two chunks, first with 1000 rows and the second one with 50 rows. Going further, we have to check if the file exists so:
if(!File.Exists(_filePath)) return;
If file doesn't exists, I will just consider it as empty file so no rows included.
var rowNum = 0;
using (var file = File.OpenRead(_filePath))
{
using (var reader = new StreamReader(file))
{
var list = new List<EntityResolver<FlatFileEntity>>();
while (!reader.EndOfStream)
{
var line = reader.ReadLine();
var entity = new FlatFileEntity()
{
Line = line,
LineNumber = ++rowNum
};
list.Add(new EntityResolver<FlatFileEntity>(entity, FlatFileHelper.FlatNameToIndexMap, FlatFileHelper.FlatIndexToMethodAccessMap));
if (rowNum <= chunkSize)
continue;
rowNum = 0;
chunkedSource.Add(list);
list = new List<EntityResolver<FlatFileEntity>>(chunkSize);
}
chunkedSource.Add(list);
}
}
Further code opens file in read mode and start reading it line by line until rowNum
reach appropiate value. If the value exceed it, chunk is ready to be passed to evaluator which is done by chunkedSource.Add(list)
. New chunk will be used for the next values.
A little explanation is required within the line where new EntityResolver<FlatFileEntity>(entity, FlatFileHelper.FlatNameToIndexMap, FlatFileHelper.FlatIndexToMethodAccessMap)
code reside. We defined earlier in the helper class how to access column in our source. This is the place where we need to use it. The EntityResolver<T>
will use this information every time the evaluator will try access some specific column. Make sure your types are identical as in source, it's enforced to use the same type as in helper class. If something will be mistyped, the exception will be thrown.
You can use custom defined functions by implementing it in class that inherits from LibraryBase
. Those class contains basic methods that are shared amongs all schemas. By Inheriting from it, you can add own custom methods that are suitable for particular schema. For example, file plugin, would contain method that calculates hash function on currently processed file. It would looks like select Sha1() from #disk.files('path/to/folder')
. Method Sha1 is vital only for files so that it should be stored inside #disk
schema.
Implementing own function can be splitted to such steps:
- Inherit from
LibraryBase
- Implement your methods
- Annotate your methods with one of available annotation.
BindableMethod
is for casual user defined functions,AggregateGetMethod
andAggregateSetMethod
are for custom grouping operators.
and that's all you have to do. For more details, you can look how LibraryBase
had been implemented. You will easily infer what you should follow in your implementation LibraryBase.cs.
Schema logically groups tables. Good example of it can be the git plugin that contains three tables: Commits
, Tags
and Branches
. For our case, we will implement flat file schema. This schema will contains only one table called FlatFileTable
which we have already implemented. When you implement your own schema, you basically, have to return three things. First one is table metadata, second one is row source for that table and the third one is object with custom methods usable for the schema. Let's look how it looks like:
public class FlatFileSchema : SchemaBase
{
private const string SchemaName = "FlatFile";
public FlatFileSchema()
: base(SchemaName, CreateLibrary())
{
}
public override ISchemaTable GetTableByName(string name, string[] parameters)
{
switch (name.ToLowerInvariant())
{
case "file":
return new FlatFileTable();
}
throw new TableNotFoundException(nameof(name));
}
public override RowSource GetRowSource(string name, string[] parameters)
{
switch (name.ToLowerInvariant())
{
case "file":
return new FlatFileSource(parameters[0]);
}
throw new SourceNotFoundException(nameof(name));
}
private static MethodsAggregator CreateLibrary()
{
var methodsManager = new MethodsManager();
var propertiesManager = new PropertiesManager();
var library = new FlatFileLibrary();
methodsManager.RegisterLibraries(library);
propertiesManager.RegisterProperties(library);
return new MethodsAggregator(methodsManager, propertiesManager);
}
}
We have overriden here three methods. First one is public override ISchemaTable GetTableByName(string name, string[] parameters)
which returns table metadata based on source name. The second one public override RowSource GetRowSource(string name, string[] parameters)
do the same but instead of table, it loads the source. The last one is invoked in the constructor and create object that contains custom methods for the schema.
Service will load plugins from the Plugins
directory in the execution folder. Put directory FlatFile
with the compiled plugin and all it's dependencies here.
Put the probing path in assembly binding section.
<probing privatePath="Plugins/FlatFile" />
That's all. Now you should be able to use your source by putting from #flatfile.file('path/to/your/file')
.