Skip to content

Plugins

Jakub Puchała edited this page Apr 5, 2018 · 10 revisions

Writing own queryable sources

Musoq uses conception of schemas and virtual tables. When typing ... from #csv.file('path/to/file.csv') you are refering to schema named csv and parametrized source based on which the virtual table will be created. What steps are required to create own source:

  1. Implement ISchemaTable. It defines what columns table has.
  2. Implement queryable source by inheriting from RowSource.
  3. Implement user defined functions by inheriting from LibraryBase.
  4. Implement schema by inheriting from SchemaBase.
  5. Copy .dlls to appropriate directory.
  6. Edit service .app config file.

All those tasks are fairly easy and we will implement very easy log source to make sure everything is clear and explained in details.

Let's start with something really easy. Simple flat file reader. It will treat file as queryable source, read it's lines one by one and do something usefull with it. Our table will consist of two columns, LineNumber and Line. The first column is just pointer to line in file. The second one is line from that file.

Implement ISchemaTable

After that short introduction, let's do some implementation. First one is ISchemaTable. This is interface that defines what columns your source has. Based on that, evaluator will be able to access proper column from the source, will know it's name, type and index. Ok, so before we start implement table, we will need to create some helper classes. Look at it below:

public static class FlatFileHelper
{
    public static readonly IDictionary<string, int> FlatNameToIndexMap;
    public static readonly IDictionary<int, Func<FlatFileEntity, object>> FlatIndexToMethodAccessMap;
    public static readonly ISchemaColumn[] FlatColumns;

    static FlatFileHelper()
    {
        FlatNameToIndexMap = new Dictionary<string, int>
        {
            {nameof(FlatFileEntity.LineNumber), 0},
            {nameof(FlatFileEntity.Line), 1}
        };

        FlatIndexToMethodAccessMap = new Dictionary<int, Func<FlatFileEntity, object>>
        {
            {0, info => info.LineNumber},
            {1, info => info.Line}
        };

        FlatColumns = new ISchemaColumn[]
        {
            new SchemaColumn(nameof(FlatFileEntity.LineNumber), 0, typeof(int)),
            new SchemaColumn(nameof(FlatFileEntity.Line), 1, typeof(string))
        };
    }
}

What this class do are three things:

  1. Defining columns in variable FlatColumns. As you can see, it just instantiate array of columns, their name, indexes and type of stored values.
  2. Define FlatNameToIndexMap that remap your column name to specifix index.
  3. Define FlatIndexToMethodAccessMap that points how to access specific property from the source.

There is one thing that is not mentioned here. what is FlatFileEntity in public static readonly IDictionary<int, Func<FlatFileEntity, object>> FlatIndexToMethodAccessMap. Answer is, the FlatFileEntity type holds single row from source. In our case, it will contain a single line from file and line number. After we have this helper class, We can move forward and implement proper table.

The easiest one implementation is really short and doesn't need explanation. It just assigns columns from helper class to our table instance.

public class FlatFileTable : ISchemaTable
{
    public ISchemaColumn[] Columns { get; } = FlatFileHelper.FlatColumns;
}

As you see, there is no any file accessing here, so where do We access our file? The answer is not here. This class just define metadata about your source. We will implement it for a little bit below.

Implement queryable source

Row source for your queries are the most robust code you will need to type and as you will see, it's complexity will depends mostly from how hard is to access row sources for your use case. In case of this tutorial, it won't be hard. Basic approach proposed by this framework is to use chunked source load. Chunked load is based on loading file in parts and process only those loaded parts, not whole source so the processor doesn't have to wait for the file to be loaded completely.

To implement flat file source, we will use RowSourceBase<FlatFileEntity> that has already implemented whole logic for loading sources and is able to pass those chunks to evaluator immediately. The only what we will need to implement is reading file line by line. To do this, we have override method CollectChunks. Let's see how the template looks like:

public class FlatFileSource : RowSourceBase<FlatFileEntity>
{
    private readonly string _filePath;

    public FlatFileSource(string filePath)
    {
        _filePath = filePath;
    }

    protected override void CollectChunks(BlockingCollection<IReadOnlyList<EntityResolver<FlatFileEntity>>> chunkedSource)
    {
       .... implementation here.
    }
}

The only parameter that is required in this case is to have path to file. Ok, let's implement file reading then. The first line we will type is:

const int chunkSize = 999;

which defines how big can be single chunk. If the file has 1050 lines, there will be two chunks, first with 1000 rows and the second one with 50 rows. Going further, we have to check if the file exists so:

if(!File.Exists(_filePath)) return;

If file doesn't exists, I will just consider it as empty file so no rows included.

var rowNum = 0;

using (var file = File.OpenRead(_filePath))
{
    using (var reader = new StreamReader(file))
    {
        var list = new List<EntityResolver<FlatFileEntity>>();

        while (!reader.EndOfStream)
        {
            var line = reader.ReadLine();
            var entity = new FlatFileEntity()
            {
                Line = line,
                LineNumber = ++rowNum
            };

            list.Add(new EntityResolver<FlatFileEntity>(entity, FlatFileHelper.FlatNameToIndexMap, FlatFileHelper.FlatIndexToMethodAccessMap));

            if (rowNum <= chunkSize)
                continue;

            rowNum = 0;
            chunkedSource.Add(list);

            list = new List<EntityResolver<FlatFileEntity>>(chunkSize);
        }

        chunkedSource.Add(list);
    }
}

Further code opens file in read mode and start reading it line by line until rowNum reach appropiate value. If the value exceed it, chunk is ready to be passed to evaluator which is done by chunkedSource.Add(list). New chunk will be used for the next values.

A little explanation is required within the line where new EntityResolver<FlatFileEntity>(entity, FlatFileHelper.FlatNameToIndexMap, FlatFileHelper.FlatIndexToMethodAccessMap) code reside. We defined earlier in the helper class how to access column in our source. This is the place where we need to use it. The EntityResolver<T> will use this information every time the evaluator will try access some specific column. Make sure your types are identical as in source, it's enforced to use the same type as in helper class. If something will be mistyped, the exception will be thrown.

Implement schema

Schema logically groups tables. Good example of it can be the git plugin that contains three tables: Commits, Tags and Branches. For our case, we will implement flat file schema. This schema will contains only one table called FlatFileTable which we have already implemented. When you implement your own schema, you basically, have to return three things. First one is table metadata, second one is row source for that table and the third one is object with custom methods usable for the schema. Let's look how it looks like:

public class FlatFileSchema : SchemaBase
{
    private const string SchemaName = "FlatFile";

    public FlatFileSchema() 
        : base(SchemaName, CreateLibrary())
    {
    }

    public override ISchemaTable GetTableByName(string name, string[] parameters)
    {
        switch (name.ToLowerInvariant())
        {
            case "file":
                return new FlatFileTable();
        }

        throw new TableNotFoundException(nameof(name));
    }

    public override RowSource GetRowSource(string name, string[] parameters)
    {
        switch (name.ToLowerInvariant())
        {
            case "file":
                return new FlatFileSource(parameters[0]);
        }

        throw new SourceNotFoundException(nameof(name));
    }

    private static MethodsAggregator CreateLibrary()
    {
        var methodsManager = new MethodsManager();
        var propertiesManager = new PropertiesManager();

        var library = new FlatFileLibrary();

        methodsManager.RegisterLibraries(library);
        propertiesManager.RegisterProperties(library);

        return new MethodsAggregator(methodsManager, propertiesManager);
    }
}
Clone this wiki locally