diff --git a/include/operator.h b/include/operator.h index 5a8c09e..c0c11ba 100644 --- a/include/operator.h +++ b/include/operator.h @@ -38,7 +38,6 @@ typedef struct SelectParams { typedef struct WindowParams { uint8_t size; uint8_t step; - bool *quit; source_t *source; } window_params_t; diff --git a/include/query.h b/include/query.h index f08ec77..5d1cee1 100644 --- a/include/query.h +++ b/include/query.h @@ -13,7 +13,6 @@ typedef struct Query { struct Operator *root; - bool quit; } query_t; typedef struct { @@ -23,7 +22,7 @@ typedef struct { } operator_thread_arg_t; -void execute_query(const query_t *query, const source_t *source, sink_t *sink); +void execute_query(const query_t *query, sink_t *sink); void join_triple_copy(const data_t *src1, uint32_t index1, const data_t *src2, uint32_t index2, data_t *dest); diff --git a/include/source.h b/include/source.h index f5220dc..df52b7e 100644 --- a/include/source.h +++ b/include/source.h @@ -8,7 +8,7 @@ // Created a source to enable other sources than generator e.g. network typedef struct Source { data_t buffer; - uint8_t index; + uint32_t index; uint8_t consumers; uint8_t consumed; data_t* (*get_next)(const struct Source *self, const uint8_t size, const uint8_t step); diff --git a/src/query.c b/src/query.c index 41445cb..dbe8b5a 100644 --- a/src/query.c +++ b/src/query.c @@ -6,7 +6,6 @@ #include #include -#include /// The join operator @@ -60,10 +59,8 @@ bool window(data_t *out, const window_params_t params) { data_t* data = params.source->get_next(params.source, params.size, params.step); - if (data == NULL) { - *params.quit = true; + if (data == NULL) return false; - } *out = *data; free(data); @@ -173,13 +170,11 @@ bool execute_operator(const operator_t *operator, const data_t *in, data_t *out) /// @param query The query to be executed /// @param source The source creating the input stream /// @param sink The sink consuming the output stream -void execute_query(const query_t *query, const source_t *source, sink_t *sink) +void execute_query(const query_t *query, sink_t *sink) { data_t data = {NULL, 0, 1}; - (void)source; - while (!query->quit) { - execute_operator(query->root, &data, &data); + while (execute_operator(query->root, &data, &data)) { sink->push_next(sink, &data); } } diff --git a/tests/dataTests.cpp b/tests/dataTests.cpp index bc87659..01fd033 100644 --- a/tests/dataTests.cpp +++ b/tests/dataTests.cpp @@ -163,6 +163,8 @@ TEST(DataTests, test_file_source) free(next_gdata); free(next_fdata); + ASSERT_EQ(fsource->get_next(fsource, 36, 36), nullptr); + free_generator_source(gsource); free_file_source(fsource); free(gsink); // not the normal because it is still the array allocated in the lib @@ -202,6 +204,8 @@ TEST(DataTests, test_file_source_inc) free(next_gdata); free(next_fdata); + ASSERT_EQ(fsource->get_next(fsource, increment, increment), nullptr); + free_generator_source(gsource); free_file_source(fsource); free(gsink); // not the normal because it is still the array allocated int he lib diff --git a/tests/queryTests.cpp b/tests/queryTests.cpp index 5fe1895..253da1b 100644 --- a/tests/queryTests.cpp +++ b/tests/queryTests.cpp @@ -54,10 +54,10 @@ TEST_F(QueryTestFixture, test_query_join_death_no_children) .right = nullptr, .params = {.join = {.size = 1, .checks = conditions}} }; - query_t query_join = {.root = &join_op, .quit = false}; + query_t query_join = {.root = &join_op}; // Program should abort because .left and .right are NULL - ASSERT_DEATH(execute_query(&query_join, gsource, gsink), ""); + ASSERT_DEATH(execute_query(&query_join, gsink), ""); } TEST_F(QueryTestFixture, test_query_join_death_no_left_child) @@ -70,10 +70,10 @@ TEST_F(QueryTestFixture, test_query_join_death_no_left_child) .right = &join_op, .params = {.join = {.size = 1, .checks = conditions}} }; - query_t query_join = {.root = &join_op, .quit = false}; + query_t query_join = {.root = &join_op}; // Program should abort because .left is NULL - ASSERT_DEATH(execute_query(&query_join, gsource, gsink), ""); + ASSERT_DEATH(execute_query(&query_join, gsink), ""); } TEST_F(QueryTestFixture, test_query_join_death_no_right_child) @@ -86,10 +86,10 @@ TEST_F(QueryTestFixture, test_query_join_death_no_right_child) .right = nullptr, .params = {.join = {.size = 1, .checks = conditions}} }; - query_t query_join = {.root = &join_op, .quit = false}; + query_t query_join = {.root = &join_op}; // Program should abort because .right is NULL - ASSERT_DEATH(execute_query(&query_join, gsource, gsink), ""); + ASSERT_DEATH(execute_query(&query_join, gsink), ""); } @@ -100,7 +100,7 @@ bool check_filter(const triple_t in) TEST_F(QueryTestFixture, test_query_filter) { - window_params_t wparams = {36, 36, &gquery.quit, gsource}; + window_params_t wparams = {36, 36, gsource}; operator_t window_op = { .type = WINDOW, .left = nullptr, @@ -116,9 +116,9 @@ TEST_F(QueryTestFixture, test_query_filter) .params = {.filter = {.size = 1, .checks = conditions}} }; - gquery = {.root = &filter_op, .quit = false}; + gquery = {.root = &filter_op}; - execute_query(&gquery, gsource, gsink); + execute_query(&gquery, gsink); triple_t expected[8] = { {SUBJECT_ALICE, PREDICATE_HAS_SKILL, OBJECT_PROGRAMMING}, {SUBJECT_BOB, PREDICATE_HAS_SKILL, OBJECT_DATA_ANALYSIS}, @@ -139,7 +139,7 @@ bool check_filter2(const triple_t in) TEST_F(QueryTestFixture, test_query_filter2) { - window_params_t wparams = {36, 36, &gquery.quit, gsource}; + window_params_t wparams = {36, 36, gsource}; operator_t window_op = { .type = WINDOW, .left = nullptr, @@ -163,9 +163,9 @@ TEST_F(QueryTestFixture, test_query_filter2) .params = {.filter = {.size = 1, .checks = conditions2}} }; - gquery = {.root = &filter_op2, .quit = false}; + gquery = {.root = &filter_op2}; - execute_query(&gquery, gsource, gsink); + execute_query(&gquery, gsink); triple_t expected[4] = { {SUBJECT_ALICE, PREDICATE_HAS_SKILL, OBJECT_PROGRAMMING}, {SUBJECT_CHARLIE, PREDICATE_HAS_SKILL, OBJECT_PROGRAMMING}, @@ -184,7 +184,7 @@ bool check_filter7(const triple_t in) TEST_F(QueryTestFixture, test_query_filter3) { - window_params_t wparams = {36, 36, &gquery.quit, gsource}; + window_params_t wparams = {36, 36, gsource}; operator_t window_op = { .type = WINDOW, .left = nullptr, @@ -216,9 +216,9 @@ TEST_F(QueryTestFixture, test_query_filter3) .params = {.filter = {.size = 1, .checks = conditions3}} }; - gquery = {.root = &filter_op3, .quit = false}; + gquery = {.root = &filter_op3}; - execute_query(&gquery, gsource, gsink); + execute_query(&gquery, gsink); triple_t expected[1] = { {SUBJECT_ALICE, PREDICATE_HAS_SKILL, OBJECT_PROGRAMMING}, }; @@ -299,7 +299,7 @@ bool check_filter3(const triple_t in) TEST_F(QueryTestFixture, test_query_join) { source_set_comsumers(gsource, 2); - window_params_t wparams = {36, 36, &gquery.quit, gsource}; + window_params_t wparams = {36, 36, gsource}; operator_t window_op = { .type = WINDOW, .left = nullptr, @@ -332,9 +332,9 @@ TEST_F(QueryTestFixture, test_query_join) .params = {.join = {.size = 1, .checks = conditions3}} }; - gquery = {.root = &join_op, .quit = false}; + gquery = {.root = &join_op}; - execute_query(&gquery, gsource, gsink); + execute_query(&gquery, gsink); triple_t expected[10] = { {SUBJECT_ALICE, PREDICATE_HAS_SKILL, OBJECT_PROGRAMMING}, @@ -359,7 +359,7 @@ TEST_F(QueryTestFixture, test_query_join) TEST_F(QueryTestFixture, test_query_select) { source_set_comsumers(gsource, 2); - window_params_t wparams = {36, 36, &gquery.quit, gsource}; + window_params_t wparams = {36, 36, gsource}; operator_t window_op = { .type = WINDOW, .left = nullptr, @@ -400,9 +400,9 @@ TEST_F(QueryTestFixture, test_query_select) .params = {.select = {.size = 1, .colums = predicates}} }; - gquery = {.root = &select_op, .quit = false}; + gquery = {.root = &select_op}; - execute_query(&gquery, gsource, gsink); + execute_query(&gquery, gsink); triple_t expected[5] = { {SUBJECT_ALICE, PREDICATE_HAS_SKILL, OBJECT_PROGRAMMING}, @@ -422,7 +422,7 @@ TEST_F(QueryTestFixture, test_query_select) TEST_F(QueryTestFixture, test_query_select2) { source_set_comsumers(gsource, 2); - window_params_t wparams = {36, 36, &gquery.quit, gsource}; + window_params_t wparams = {36, 36, gsource}; operator_t window_op = { .type = WINDOW, .left = nullptr, @@ -463,9 +463,9 @@ TEST_F(QueryTestFixture, test_query_select2) .params = {.select = {.size = 2, .colums = predicates}} }; - gquery = {.root = &select_op, .quit = false}; + gquery = {.root = &select_op}; - execute_query(&gquery, gsource, gsink); + execute_query(&gquery, gsink); triple_t expected[10] = { {SUBJECT_ALICE, PREDICATE_HAS_SKILL, OBJECT_PROGRAMMING}, @@ -510,7 +510,7 @@ bool check_filter5(const triple_t in) TEST_F(QueryTestFixture, test_query_1) { source_set_comsumers(gsource, 3); - window_params_t wparams = {36, 36, &gquery.quit, gsource}; + window_params_t wparams = {36, 36, gsource}; operator_t window_op = { .type = WINDOW, .left = nullptr, @@ -567,9 +567,9 @@ TEST_F(QueryTestFixture, test_query_1) .params = {.filter = {.size = 1, .checks = conditions6}} }; - gquery = {.root = &filter_older, .quit = false}; + gquery = {.root = &filter_older}; - execute_query(&gquery, gsource, gsink); + execute_query(&gquery, gsink); triple_t expected[6] = { {SUBJECT_CHARLIE, PREDICATE_HAS_AGE, 35},