-
Notifications
You must be signed in to change notification settings - Fork 500
/
test_specific_rules.py
218 lines (189 loc) · 10.3 KB
/
test_specific_rules.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import unittest
from copy import deepcopy
from pathlib import Path
import eql.ast
from semver import Version
import kql
from detection_rules.integrations import (
find_latest_compatible_version,
load_integrations_manifests,
load_integrations_schemas,
)
from detection_rules import ecs
from detection_rules.config import load_current_package_version
from detection_rules.packaging import current_stack_version
from detection_rules.rule import QueryValidator
from detection_rules.rule_loader import RuleCollection
from detection_rules.schemas import get_stack_schemas
from detection_rules.utils import get_path, load_rule_contents
from .base import BaseRuleTest
PACKAGE_STACK_VERSION = Version.parse(current_stack_version(), optional_minor_and_patch=True)
class TestEndpointQuery(BaseRuleTest):
"""Test endpoint-specific rules."""
@unittest.skipIf(
PACKAGE_STACK_VERSION < Version.parse("8.3.0"),
"Test only applicable to 8.3+ stacks since query updates are min_stacked at 8.3.0",
)
def test_os_and_platform_in_query(self):
"""Test that all endpoint rules have an os defined and linux includes platform."""
for rule in self.all_rules:
if not rule.contents.data.get('language') in ('eql', 'kuery'):
continue
if rule.path.parent.name not in ("windows", "macos", "linux"):
# skip cross-platform for now
continue
ast = rule.contents.data.ast
fields = [str(f) for f in ast if isinstance(f, (kql.ast.Field, eql.ast.Field))]
err_msg = f"{self.rule_str(rule)} missing required field for endpoint rule"
if "host.os.type" not in fields:
# Exception for Forwarded Events which contain Windows-only fields.
if rule.path.parent.name == "windows":
if not any(field.startswith("winlog.") for field in fields):
self.assertIn("host.os.type", fields, err_msg)
else:
self.assertIn("host.os.type", fields, err_msg)
# going to bypass this for now
# if rule.path.parent.name == 'linux':
# err_msg = f'{self.rule_str(rule)} missing required field for linux endpoint rule'
# self.assertIn('host.os.platform', fields, err_msg)
class TestNewTerms(BaseRuleTest):
"""Test new term rules."""
@unittest.skipIf(
PACKAGE_STACK_VERSION < Version.parse("8.4.0"), "Test only applicable to 8.4+ stacks for new terms feature."
)
def test_history_window_start(self):
"""Test new terms history window start field."""
for rule in self.all_rules:
if rule.contents.data.type == "new_terms":
# validate history window start field exists and is correct
assert (
rule.contents.data.new_terms.history_window_start
), "new terms field found with no history_window_start field defined"
assert (
rule.contents.data.new_terms.history_window_start[0].field == "history_window_start"
), f"{rule.contents.data.new_terms.history_window_start} should be 'history_window_start'"
@unittest.skipIf(
PACKAGE_STACK_VERSION < Version.parse("8.4.0"), "Test only applicable to 8.4+ stacks for new terms feature."
)
def test_new_terms_field_exists(self):
# validate new terms and history window start fields are correct
for rule in self.all_rules:
if rule.contents.data.type == "new_terms":
assert (
rule.contents.data.new_terms.field == "new_terms_fields"
), f"{rule.contents.data.new_terms.field} should be 'new_terms_fields' for new_terms rule type"
@unittest.skipIf(
PACKAGE_STACK_VERSION < Version.parse("8.4.0"), "Test only applicable to 8.4+ stacks for new terms feature."
)
def test_new_terms_fields(self):
"""Test new terms fields are schema validated."""
# ecs validation
for rule in self.all_rules:
if rule.contents.data.type == "new_terms":
meta = rule.contents.metadata
feature_min_stack = Version.parse("8.4.0")
current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
min_stack_version = (
Version.parse(meta.get("min_stack_version")) if meta.get("min_stack_version") else None
)
min_stack_version = (
current_package_version
if min_stack_version is None or min_stack_version < current_package_version
else min_stack_version
)
assert (
min_stack_version >= feature_min_stack
), f"New Terms rule types only compatible with {feature_min_stack}+"
ecs_version = get_stack_schemas()[str(min_stack_version)]["ecs"]
beats_version = get_stack_schemas()[str(min_stack_version)]["beats"]
# checks if new terms field(s) are in ecs, beats non-ecs or integration schemas
queryvalidator = QueryValidator(rule.contents.data.query)
_, _, schema = queryvalidator.get_beats_schema([], beats_version, ecs_version)
for index_name in rule.contents.data.index:
schema.update(**ecs.flatten(ecs.get_index_schema(index_name)))
integration_manifests = load_integrations_manifests()
integration_schemas = load_integrations_schemas()
integration_tags = meta.get("integration")
if integration_tags:
for tag in integration_tags:
latest_tag_compat_ver, _ = find_latest_compatible_version(
package=tag,
integration="",
rule_stack_version=min_stack_version,
packages_manifest=integration_manifests,
)
if latest_tag_compat_ver:
integration_schema = integration_schemas[tag][latest_tag_compat_ver]
for policy_template in integration_schema.keys():
schema.update(**integration_schemas[tag][latest_tag_compat_ver][policy_template])
for new_terms_field in rule.contents.data.new_terms.value:
assert (
new_terms_field in schema.keys()
), f"{new_terms_field} not found in ECS, Beats, or non-ecs schemas"
@unittest.skipIf(
PACKAGE_STACK_VERSION < Version.parse("8.4.0"), "Test only applicable to 8.4+ stacks for new terms feature."
)
def test_new_terms_max_limit(self):
"""Test new terms max limit."""
# validates length of new_terms to stack version - https://github.com/elastic/kibana/issues/142862
for rule in self.all_rules:
if rule.contents.data.type == "new_terms":
meta = rule.contents.metadata
feature_min_stack = Version.parse("8.4.0")
feature_min_stack_extended_fields = Version.parse("8.6.0")
current_package_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
min_stack_version = (
Version.parse(meta.get("min_stack_version")) if meta.get("min_stack_version") else None
)
min_stack_version = (
current_package_version
if min_stack_version is None or min_stack_version < current_package_version
else min_stack_version
)
if feature_min_stack <= min_stack_version < feature_min_stack_extended_fields:
assert (
len(rule.contents.data.new_terms.value) == 1
), f"new terms have a max limit of 1 for stack versions below {feature_min_stack_extended_fields}"
@unittest.skipIf(
PACKAGE_STACK_VERSION < Version.parse("8.6.0"), "Test only applicable to 8.4+ stacks for new terms feature."
)
def test_new_terms_fields_unique(self):
"""Test new terms fields are unique."""
# validate fields are unique
for rule in self.all_rules:
if rule.contents.data.type == "new_terms":
assert len(set(rule.contents.data.new_terms.value)) == len(
rule.contents.data.new_terms.value
), f"new terms fields values are not unique - {rule.contents.data.new_terms.value}"
class TestESQLRules(BaseRuleTest):
"""Test ESQL Rules."""
def run_esql_test(self, esql_query, expectation, message):
"""Test that the query validation is working correctly."""
rc = RuleCollection()
file_path = Path(get_path("tests", "data", "command_control_dummy_production_rule.toml"))
original_production_rule = load_rule_contents(file_path)
# Test that a ValidationError is raised if the query doesn't match the schema
production_rule = deepcopy(original_production_rule)[0]
production_rule["rule"]["query"] = esql_query
expectation.match_expr = message
with expectation:
rc.load_dict(production_rule)
def test_esql_queries(self):
"""Test ESQL queries."""
# test_cases = [
# # invalid queries
# ('from .ds-logs-endpoint.events.process-default-* | wheres process.name like "Microsoft*"',
# pytest.raises(marshmallow.exceptions.ValidationError), r"ESQL query failed"),
# ('from .ds-logs-endpoint.events.process-default-* | where process.names like "Microsoft*"',
# pytest.raises(marshmallow.exceptions.ValidationError), r"ESQL query failed"),
#
# # valid queries
# ('from .ds-logs-endpoint.events.process-default-* | where process.name like "Microsoft*"',
# does_not_raise(), None),
# ]
# for esql_query, expectation, message in test_cases:
# self.run_esql_test(esql_query, expectation, message)