diff --git a/examples/volve_wells.py b/examples/volve_wells.py index f2483ae..9ee844f 100644 --- a/examples/volve_wells.py +++ b/examples/volve_wells.py @@ -92,8 +92,12 @@ # create a trimesh scene and plot with welleng plotter print("Making a scene and plotting...") -scene = we.mesh.make_trimesh_scene(data) -we.visual.plot(scene) +plt = we.visual.Plotter() +for well in data.values(): + plt.add(well) +plt.show() +# scene = we.mesh.make_trimesh_scene(data) +# we.visual.plot(scene) ########################################################################## # if you wanted to export a transformed scene so that you can, for example diff --git a/tests/test_clearance_iscwsa.py b/tests/test_clearance_iscwsa.py index 069f3f9..94b7035 100644 --- a/tests/test_clearance_iscwsa.py +++ b/tests/test_clearance_iscwsa.py @@ -1,3 +1,4 @@ +import unittest from welleng.survey import Survey, make_survey_header from welleng.clearance import IscwsaClearance import numpy as np @@ -16,7 +17,7 @@ data = json.load(open(filename)) -def generate_surveys(data): +def generate_surveys(self, data=data): # Generate surveys for imported wells surveys = {} @@ -56,74 +57,74 @@ def generate_surveys(data): return surveys -def test_minimize_sf(data=data): - surveys = generate_surveys(data) - reference = surveys["Reference well"] - offset = surveys["09 - well"] - - result = IscwsaClearance(reference, offset, minimize_sf=False) - result_min = IscwsaClearance(reference, offset, minimize_sf=True) - - idx = np.where(result_min.ref.interpolated == False) - - # Check that interpolated survey is not corrupted - for attr in [ - 'azi_grid_rad', 'azi_mag_rad', 'azi_true_rad', 'cov_hla', 'cov_nev', - 'pos_nev', 'pos_xyz', 'md', 'radius' - ]: - assert np.allclose( - getattr(result.ref, attr), getattr(result_min.ref, attr)[idx] - ) - - pass - - for attr in [ - 'Rr', 'calc_hole', 'distance_cc', 'eou_boundary', - 'eou_separation', 'hoz_bearing', 'idx', 'masd', 'off_cov_hla', - 'off_cov_nev', 'off_delta_hlas', 'off_delta_nevs', 'off_pcr', - 'ref_cov_hla', 'ref_cov_nev', 'ref_delta_hlas', 'ref_delta_nevs', - 'ref_nevs', 'ref_pcr', 'sf', 'wellbore_separation' - ]: - # `toolface_bearing` and `trav_cyl_azi_deg` are a bit unstable when - # well paths are parallel. - - assert np.allclose( - getattr(result, attr), getattr(result_min, attr)[idx], - rtol=1e-01, atol=1e-02 - ) - - pass - - -def test_clearance_iscwsa(data=data, rtol=1e-02, atol=1e-03): - surveys = generate_surveys(data) - reference = surveys["Reference well"] - - # Perform clearance checks for each survey - for well in surveys: - if well != "09 - well": - continue - if well == "Reference well": - continue - else: - offset = surveys[well] - # skip well 10 - if well in ["10 - well"]: +class TestClearanceIscwsa(unittest.TestCase): + def test_minimize_sf(self, data=data): + surveys = generate_surveys(data) + reference = surveys["Reference well"] + offset = surveys["09 - well"] + + result = IscwsaClearance(reference, offset, minimize_sf=False) + result_min = IscwsaClearance(reference, offset, minimize_sf=True) + + idx = np.where(result_min.ref.interpolated == False) + + # Check that interpolated survey is not corrupted + for attr in [ + 'azi_grid_rad', 'azi_mag_rad', 'azi_true_rad', 'cov_hla', 'cov_nev', + 'pos_nev', 'pos_xyz', 'md', 'radius' + ]: + assert np.allclose( + getattr(result.ref, attr), getattr(result_min.ref, attr)[idx] + ) + + pass + + for attr in [ + 'Rr', 'calc_hole', 'distance_cc', 'eou_boundary', + 'eou_separation', 'hoz_bearing', 'idx', 'masd', 'off_cov_hla', + 'off_cov_nev', 'off_delta_hlas', 'off_delta_nevs', 'off_pcr', + 'ref_cov_hla', 'ref_cov_nev', 'ref_delta_hlas', 'ref_delta_nevs', + 'ref_nevs', 'ref_pcr', 'sf', 'wellbore_separation' + ]: + # `toolface_bearing` and `trav_cyl_azi_deg` are a bit unstable when + # well paths are parallel. + + assert np.allclose( + getattr(result, attr), getattr(result_min, attr)[idx], + rtol=1e-01, atol=1e-02 + ) + + pass + + def test_clearance_iscwsa(self, data=data, rtol=1e-02, atol=1e-03): + surveys = generate_surveys(data) + reference = surveys["Reference well"] + + # Perform clearance checks for each survey + for well in surveys: + if well != "09 - well": + continue + if well == "Reference well": continue else: - for b in [False, True]: - result = IscwsaClearance(reference, offset, minimize_sf=b) + offset = surveys[well] + # skip well 10 + if well in ["10 - well"]: + continue + else: + for b in [False, True]: + result = IscwsaClearance(reference, offset, minimize_sf=b) + assert np.allclose( + result.sf[np.where(result.ref.interpolated == False)], + np.array(data["wells"][well]["SF"]), + rtol=rtol, atol=atol + ) - assert np.allclose( - result.sf[np.where(result.ref.interpolated == False)], - np.array(data["wells"][well]["SF"]), - rtol=rtol, atol=atol - ) - - pass + pass # make above test runnanble separately if __name__ == '__main__': - test_minimize_sf(data=data) - test_clearance_iscwsa(data=data) + unittest.main() + # test_minimize_sf(data=data) + # test_clearance_iscwsa(data=data) diff --git a/tests/test_connector.py b/tests/test_connector.py index 2418249..b451689 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -1,145 +1,139 @@ import inspect import sys +import unittest + +import numpy as np + from welleng.connector import Connector from welleng.survey import Survey, from_connections -import numpy as np -def test_md_hold(): - # test hold with only md provided - c = Connector( - vec1=[0, 0, 1], - md2=500, - ) - assert ( - c.inc_target == c.inc1 - and c.azi_target == c.azi1 - and c.pos_target[2] == c.md_target - ), "Failed c1" - assert c.method == 'hold', "Unexpected method" - - assert isinstance(from_connections(c), Survey) - - -def test_md_and_vec(): - # test with md2 and vec2 provided (minimum curvature) - c = Connector( - vec1=[0, 0, 1], - md2=1000, - vec2=[0, 1, 0] - ) - assert c.method == 'min_curve' - - -def test_pos(): - # test with pos2 provided (minimum distance) - c = Connector( - vec1=[0, 0, 1], - pos2=[100, 100, 1000], - ) - assert c.md_target > c.pos1[2], "Failed c3" - - -def test_pos_and_dls(): - # test with pos2 needing more aggressive dls (minimum curvature) - c = Connector( - vec1=[0, 0, 1], - pos2=[200, 400, 200] - ) - assert c.method == 'min_curve_to_target' - - -def test_pos_and_vec(): - # test with pos2 and vec2 provided - vec1 = [-1, -1, 1] - vec2 = [1, -1, 0] - c = Connector( - pos1=[0., 0., 0], - vec1=vec1 / np.linalg.norm(vec1), - pos2=[0., 1000., 500.], - vec2=vec2 / np.linalg.norm(vec2), - ) - assert c.method == 'curve_hold_curve' - - # test if interpolator and survey functions are working - assert isinstance(from_connections(c, step=30), Survey) - - -def test_pos_inc_azi(): - # test with pos2, inc1 and azi1 provided - c = Connector( - pos1=[0., 0., 0], - inc1=0., - azi1=90, - pos2=[1000., 1000., 1000.], - vec2=[0., 0., 1.], - ) - assert c.method == 'curve_hold_curve' - - -def test_dls2(): - # test with different dls for second curve section - c = Connector( - pos1=[0., 0., 0], - vec1=[0., 0., 1.], - pos2=[0., 100., 1000.], - vec2=[0., 0., 1.], - dls_design2=5 - ) - assert c.radius_design2 < c.radius_design - - -def test_radius_critical(): - # test with dls_critical requirement (actual dls < dls_design) - c = Connector( - pos1=[0., 0., 0], - vec1=[0., 0., 1.], - pos2=[0., 100., 100.], - vec2=[0., 0., 1.], - ) - assert c.radius_critical < c.radius_design - - -def test_min_curve(): - # test min_curve (inc2 provided) - c = Connector( - pos1=[0., 0., 0], - vec1=[0., 0., 1.], - inc2=30, - ) - assert c.method == 'min_curve' - - -def test_radius_critical_with_min_curve(): - # test min_curve with md less than required radius - c = Connector( - pos1=[0., 0., 0], - inc1=0, - azi1=0, - md2=500, - inc2=90, - azi2=0, - ) - assert c.radius_critical < c.radius_design - - -def one_function_to_run_them_all(): - """ - Function to gather the test functions so that they can be tested by - running this module. - - https://stackoverflow.com/questions/18907712/python-get-list-of-all- - functions-in-current-module-inspecting-current-module - """ - test_functions = [ - obj for name, obj in inspect.getmembers(sys.modules[__name__]) - if (inspect.isfunction(obj) - and name.startswith('test') - and name != 'all') - ] - - [f() for f in test_functions] +class ConnectorTest(unittest.TestCase): + def test_md_hold(self): + # test hold with only md provided + c = Connector( + vec1=[0, 0, 1], + md2=500, + ) + assert ( + c.inc_target == c.inc1 + and c.azi_target == c.azi1 + and c.pos_target[2] == c.md_target + ), "Failed c1" + assert c.method == 'hold', "Unexpected method" + + assert isinstance(from_connections(c), Survey) + + def test_md_and_vec(self): + # test with md2 and vec2 provided (minimum curvature) + c = Connector( + vec1=[0, 0, 1], + md2=1000, + vec2=[0, 1, 0] + ) + assert c.method == 'min_curve' + + def test_pos(self): + # test with pos2 provided (minimum distance) + c = Connector( + vec1=[0, 0, 1], + pos2=[100, 100, 1000], + ) + assert c.md_target > c.pos1[2], "Failed c3" + + def test_pos_and_dls(self): + # test with pos2 needing more aggressive dls (minimum curvature) + c = Connector( + vec1=[0, 0, 1], + pos2=[200, 400, 200] + ) + assert c.method == 'min_curve_to_target' + + def test_pos_and_vec(self): + # test with pos2 and vec2 provided + vec1 = [-1, -1, 1] + vec2 = [1, -1, 0] + c = Connector( + pos1=[0., 0., 0], + vec1=vec1 / np.linalg.norm(vec1), + pos2=[0., 1000., 500.], + vec2=vec2 / np.linalg.norm(vec2), + ) + assert c.method == 'curve_hold_curve' + + # test if interpolator and survey functions are working + assert isinstance(from_connections(c, step=30), Survey) + + def test_pos_inc_azi(self): + # test with pos2, inc1 and azi1 provided + c = Connector( + pos1=[0., 0., 0], + inc1=0., + azi1=90, + pos2=[1000., 1000., 1000.], + vec2=[0., 0., 1.], + ) + assert c.method == 'curve_hold_curve' + + def test_dls2(self): + # test with different dls for second curve section + c = Connector( + pos1=[0., 0., 0], + vec1=[0., 0., 1.], + pos2=[0., 100., 1000.], + vec2=[0., 0., 1.], + dls_design2=5 + ) + assert c.radius_design2 < c.radius_design + + def test_radius_critical(self): + # test with dls_critical requirement (actual dls < dls_design) + c = Connector( + pos1=[0., 0., 0], + vec1=[0., 0., 1.], + pos2=[0., 100., 100.], + vec2=[0., 0., 1.], + ) + assert c.radius_critical < c.radius_design + + def test_min_curve(self): + # test min_curve (inc2 provided) + c = Connector( + pos1=[0., 0., 0], + vec1=[0., 0., 1.], + inc2=30, + ) + assert c.method == 'min_curve' + + def test_radius_critical_with_min_curve(self): + # test min_curve with md less than required radius + c = Connector( + pos1=[0., 0., 0], + inc1=0, + azi1=0, + md2=500, + inc2=90, + azi2=0, + ) + assert c.radius_critical < c.radius_design +# def one_function_to_run_them_all(): +# """ +# Function to gather the test functions so that they can be tested by +# running this module. + +# https://stackoverflow.com/questions/18907712/python-get-list-of-all- +# functions-in-current-module-inspecting-current-module +# """ +# test_functions = [ +# obj for name, obj in inspect.getmembers(sys.modules[__name__]) +# if (inspect.isfunction(obj) +# and name.startswith('test') +# and name != 'all') +# ] + +# [f() for f in test_functions] if __name__ == '__main__': - one_function_to_run_them_all() + unittest.main() + # one_function_to_run_them_all() diff --git a/tests/test_data/error_mwdrev5_iscwsa_validation_results.xlsx b/tests/test_data/error_mwdrev5_iscwsa_validation_results.xlsx index 846d9ce..4e13498 100644 Binary files a/tests/test_data/error_mwdrev5_iscwsa_validation_results.xlsx and b/tests/test_data/error_mwdrev5_iscwsa_validation_results.xlsx differ diff --git a/tests/test_fluid.py b/tests/test_fluid.py index 7afced5..03e581e 100644 --- a/tests/test_fluid.py +++ b/tests/test_fluid.py @@ -1,30 +1,34 @@ +import unittest + from welleng.fluid import Fluid -def test_fluid_density_profile(): - """ - Test that the get_density_profile method in the Fluid class returns the - same value as the example in the SPE 11118 paper. - """ - fluid = Fluid( - fluid_density=10., # ppg - reference_temp=120., # Fahrenheit, - weighting_material='SPE_11118', - base_fluid_water_ratio=0.103, - ) +class FluidTest(unittest.TestCase): + def test_fluid_density_profile(self): + """ + Test that the get_density_profile method in the Fluid class returns the + same value as the example in the SPE 11118 paper. + """ + fluid = Fluid( + fluid_density=10., # ppg + reference_temp=120., # Fahrenheit, + weighting_material='SPE_11118', + base_fluid_water_ratio=0.103, + ) - # Override properties to align with ones provided in the example. - fluid.volume_water_reference_relative = 0.09 - fluid.volume_oil_reference_relative = 0.78 - fluid.volume_weighting_material_relative = 0.11 + # Override properties to align with ones provided in the example. + fluid.volume_water_reference_relative = 0.09 + fluid.volume_oil_reference_relative = 0.78 + fluid.volume_weighting_material_relative = 0.11 - density_profile = fluid.get_density_profile( - depth=10_000., - temperature=250. - ) + density_profile = fluid.get_density_profile( + depth=10_000., + temperature=250. + ) - assert round(density_profile[-1], 2) == 9.85 + assert round(density_profile[-1], 2) == 9.85 if __name__ == '__main__': - test_fluid_density_profile() + unittest.main() + # test_fluid_density_profile() diff --git a/tests/test_iscwsa_mwd_error.py b/tests/test_iscwsa_mwd_error.py index 0bda3e5..04b5082 100644 --- a/tests/test_iscwsa_mwd_error.py +++ b/tests/test_iscwsa_mwd_error.py @@ -1,8 +1,11 @@ import json +import unittest + import numpy as np import pandas as pd -from welleng.utils import get_sigmas + from welleng.survey import Survey, SurveyHeader +from welleng.utils import get_sigmas """ Test that the ISCWSA MWD Rev5 error model is working within a defined @@ -56,69 +59,71 @@ def get_err(error_model, wd): # initiate lists -def test_iscwsa_error_models(input_files=input_files): - for error_model, filename in input_files.items(): - df, err = initiate(error_model, filename) - - nn_c, ee_c, vv_c, ne_c, nv_c, ev_c = [], [], [], [], [], [] - data = [ - nn_c, ee_c, vv_c, ne_c, nv_c, ev_c - ] - - # generate error data - for index, row in df.iterrows(): - i = get_md_index(err, row['md']) - s = row['source'] - if s in ["Totals", "TOTAL"]: - source_cov = err.errors.cov_NEVs.T[i] - else: - source_cov = err.errors.errors[s].cov_NEV.T[i] - v = get_sigmas(source_cov, long=True) - for j, d in enumerate(v): - data[j].append(d[0]) - - # convert to dictionary - ed = {} - headers = [ - 'nn_c', 'ee_c', 'vv_c', 'ne_c', 'nv_c', 'ev_c' - ] - for i, h in enumerate(headers): - ed[h] = data[i] - - df_c = pd.DataFrame(ed) - - df_r = df.join(df_c) - - headers = [ - 'nn_d', 'ee_d', 'vv_d', 'ne_d', 'nv_d', 'ev_d' - ] - df_d = pd.DataFrame( - np.around( - np.array(df_c) - np.array(df.iloc[:, 2:]), - decimals=4 - ), - columns=headers - ) - - df_r = df_r.join(df_d) - - with np.errstate(divide='ignore', invalid='ignore'): - error = np.nan_to_num(np.absolute( - np.array(df_d) / np.array(df.iloc[:, 2:]) - ) * 100 +class IscwsaMwdErroroTest(unittest.TestCase): + def test_iscwsa_error_models(self, input_files=input_files): + for error_model, filename in input_files.items(): + df, err = initiate(error_model, filename) + + nn_c, ee_c, vv_c, ne_c, nv_c, ev_c = [], [], [], [], [], [] + data = [ + nn_c, ee_c, vv_c, ne_c, nv_c, ev_c + ] + + # generate error data + for index, row in df.iterrows(): + i = get_md_index(err, row['md']) + s = row['source'] + if s in ["Totals", "TOTAL"]: + source_cov = err.errors.cov_NEVs.T[i] + else: + source_cov = err.errors.errors[s].cov_NEV.T[i] + v = get_sigmas(source_cov, long=True) + for j, d in enumerate(v): + data[j].append(d[0]) + + # convert to dictionary + ed = {} + headers = [ + 'nn_c', 'ee_c', 'vv_c', 'ne_c', 'nv_c', 'ev_c' + ] + for i, h in enumerate(headers): + ed[h] = data[i] + + df_c = pd.DataFrame(ed) + + df_r = df.join(df_c) + + headers = [ + 'nn_d', 'ee_d', 'vv_d', 'ne_d', 'nv_d', 'ev_d' + ] + df_d = pd.DataFrame( + np.around( + np.array(df_c) - np.array(df.iloc[:, 2:]), + decimals=4 + ), + columns=headers ) - assert np.all(error < TOLERANCE), ( - f"failing error {d}" - ) + df_r = df_r.join(df_d) - # if you wanted to view the results, this would save then to an Excel - # file. - df_r.to_excel( - "tests/test_data/error_mwdrev5_iscwsa_validation_results.xlsx" - ) + with np.errstate(divide='ignore', invalid='ignore'): + error = np.nan_to_num(np.absolute( + np.array(df_d) / np.array(df.iloc[:, 2:]) + ) * 100 + ) + + assert np.all(error < TOLERANCE), ( + f"failing error {d}" + ) + + # if you wanted to view the results, this would save then to an Excel + # file. + df_r.to_excel( + "tests/test_data/error_mwdrev5_iscwsa_validation_results.xlsx" + ) # make above test runnanble separately if __name__ == '__main__': - test_iscwsa_error_models(input_files) + unittest.main() + # test_iscwsa_error_models(input_files) diff --git a/tests/test_survey_interpolate.py b/tests/test_survey_interpolate.py index 6a319c5..a9ceec0 100644 --- a/tests/test_survey_interpolate.py +++ b/tests/test_survey_interpolate.py @@ -1,6 +1,6 @@ +import unittest + import welleng as we -import inspect -import sys survey = we.survey.Survey( md=[0, 500, 1000, 2000, 2500, 3500], @@ -10,59 +10,58 @@ ) -def test_survey_interpolate_survey(step=30): - global survey - survey_interp = we.survey.interpolate_survey(survey, step=step) - assert isinstance(survey_interp, we.survey.Survey) - - survey_interp = survey.interpolate_survey(step=step) - assert isinstance(survey_interp, we.survey.Survey) - - -def test_survey_interpolate_survey_tvd(step=10): - global survey - survey_interp = survey.interpolate_survey(step=30) - survey_interp_tvd = we.survey.interpolate_survey_tvd( - survey_interp, step=step - ) - assert isinstance(survey_interp_tvd, we.survey.Survey) +class SurveyInterpolateTest(unittest.TestCase): + def test_survey_interpolate_survey(self, step=30): + global survey + survey_interp = we.survey.interpolate_survey(survey, step=step) + assert isinstance(survey_interp, we.survey.Survey) - survey_interp_tvd = survey_interp.interpolate_survey_tvd(step=step) - assert isinstance(survey_interp_tvd, we.survey.Survey) + survey_interp = survey.interpolate_survey(step=step) + assert isinstance(survey_interp, we.survey.Survey) + def test_survey_interpolate_survey_tvd(self, step=10): + global survey + survey_interp = survey.interpolate_survey(step=30) + survey_interp_tvd = we.survey.interpolate_survey_tvd( + survey_interp, step=step + ) + assert isinstance(survey_interp_tvd, we.survey.Survey) -def test_interpolate_md(md=800): - global survey - node = survey.interpolate_md(md=md) - assert isinstance(node, we.node.Node) + survey_interp_tvd = survey_interp.interpolate_survey_tvd(step=step) + assert isinstance(survey_interp_tvd, we.survey.Survey) + def test_interpolate_md(self, md=800): + global survey + node = survey.interpolate_md(md=md) + assert isinstance(node, we.node.Node) -def test_interpolate_tvd(tvd=800): - global survey - node = survey.interpolate_tvd(tvd=tvd) - assert isinstance(node, we.node.Node) + def test_interpolate_tvd(self, tvd=800): + global survey + node = survey.interpolate_tvd(tvd=tvd) + assert isinstance(node, we.node.Node) -def one_function_to_run_them_all(): - """ - Function to gather the test functions so that they can be tested by - running this module. +# def one_function_to_run_them_all(): +# """ +# Function to gather the test functions so that they can be tested by +# running this module. - https://stackoverflow.com/questions/18907712/python-get-list-of-all- - functions-in-current-module-inspecting-current-module - """ - test_functions = [ - obj for name, obj in inspect.getmembers(sys.modules[__name__]) - if (inspect.isfunction(obj) - and name.startswith('test') - and name != 'all') - ] +# https://stackoverflow.com/questions/18907712/python-get-list-of-all- +# functions-in-current-module-inspecting-current-module +# """ +# test_functions = [ +# obj for name, obj in inspect.getmembers(sys.modules[__name__]) +# if (inspect.isfunction(obj) +# and name.startswith('test') +# and name != 'all') +# ] - for f in test_functions: - f() +# for f in test_functions: +# f() - pass +# pass if __name__ == '__main__': - one_function_to_run_them_all() + unittest.main() + # one_function_to_run_them_all() diff --git a/tests/test_survey_parameters.py b/tests/test_survey_parameters.py index 0d4fc45..4e12ef4 100644 --- a/tests/test_survey_parameters.py +++ b/tests/test_survey_parameters.py @@ -1,5 +1,4 @@ -import inspect -import sys +import unittest import welleng as we @@ -13,41 +12,43 @@ } -def test_known_location(): - calculator = we.survey.SurveyParameters(reference.get('srs')) - survey_parameters = calculator.get_factors_from_x_y( - x=reference.get('x'), y=reference.get('y'), date=reference.get('date') - ) +class SurveyParamsTest(unittest.TestCase): + def test_known_location(self): + calculator = we.survey.SurveyParameters(reference.get('srs')) + survey_parameters = calculator.get_factors_from_x_y( + x=reference.get('x'), y=reference.get('y'), date=reference.get('date') + ) - for k, v in survey_parameters.items(): - try: - assert round(v, 3) == round(reference.get(k), 3) - except TypeError: - assert v == reference.get(k) + for k, v in survey_parameters.items(): + try: + assert round(v, 3) == round(reference.get(k), 3) + except TypeError: + assert v == reference.get(k) - pass + pass -def one_function_to_run_them_all(): - """ - Function to gather the test functions so that they can be tested by - running this module. +# def one_function_to_run_them_all(): +# """ +# Function to gather the test functions so that they can be tested by +# running this module. - https://stackoverflow.com/questions/18907712/python-get-list-of-all- - functions-in-current-module-inspecting-current-module - """ - test_functions = [ - obj for name, obj in inspect.getmembers(sys.modules[__name__]) - if (inspect.isfunction(obj) - and name.startswith('test') - and name != 'all') - ] +# https://stackoverflow.com/questions/18907712/python-get-list-of-all- +# functions-in-current-module-inspecting-current-module +# """ +# test_functions = [ +# obj for name, obj in inspect.getmembers(sys.modules[__name__]) +# if (inspect.isfunction(obj) +# and name.startswith('test') +# and name != 'all') +# ] - for f in test_functions: - f() +# for f in test_functions: +# f() - pass +# pass if __name__ == '__main__': - one_function_to_run_them_all() + unittest.main() + # one_function_to_run_them_all() diff --git a/tests/test_utils.py b/tests/test_utils.py index e568927..971d337 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,5 +1,6 @@ import inspect import sys +import unittest import numpy as np from numpy.typing import NDArray @@ -41,89 +42,88 @@ def _generate_random_dms(n: int, ndigits: int = None) -> NDArray: ).reshape((-1, 2, 4)) -def test_annular_volume(): - av = annular_volume( - od=ureg('12.25 inch').to('meter'), - id=ureg(f'{9+5/8} inch').to('meter'), - length=ureg('1000 meter') - ) - - assert av.m == 3.491531223156194 - assert str(av.u) == 'meter ** 3' - - pass - - -def test_decimal2dms(): - degrees, minutes, seconds, direction = decimal2dms( - (LAT[0] + LAT[1] / 60 + LAT[2] / 3600, LAT[3]) - ) - assert (degrees, minutes, round(seconds, 4), direction) == LAT - - dms = decimal2dms(np.array([ - (LAT[0] + LAT[1] / 60 + LAT[2] / 3600, LAT[3]), - (LON[0] + LON[1] / 60 + LON[2] / 3600, LON[3]) - ]), ndigits=4) - assert np.all(np.equal( - dms, - np.array((np.array(LAT, dtype=object), np.array(LON, dtype=object))) - )) - - -def test_dms2decimal(): - decimal = dms2decimal((-LAT[0], LAT[1], LAT[2], LAT[3])) # check it handles westerly - assert np.all(np.equal( - decimal, - np.array([ - -(LAT[0] + LAT[1] / 60 + LAT[2] / 3600), - LAT[3] - ], dtype=object) - )) - - decimals = dms2decimal((LAT, LON)) - assert np.all(np.equal( - decimals, - np.array((dms2decimal(LAT), dms2decimal(LON))) - )) - - decimals = dms2decimal(((LAT, LON), (LON, LAT))) - assert np.all(np.equal( - decimals, - np.array(( - (dms2decimal(LAT), dms2decimal(LON)), - (dms2decimal(LON), dms2decimal(LAT)) +class UtilsTest(unittest.TestCase): + def test_annular_volume(self): + av = annular_volume( + od=ureg('12.25 inch').to('meter'), + id=ureg(f'{9+5/8} inch').to('meter'), + length=ureg('1000 meter') + ) + + assert av.m == 3.491531223156194 + assert str(av.u) == 'meter ** 3' + + pass + + def test_decimal2dms(self): + degrees, minutes, seconds, direction = decimal2dms( + (LAT[0] + LAT[1] / 60 + LAT[2] / 3600, LAT[3]) + ) + assert (degrees, minutes, round(seconds, 4), direction) == LAT + + dms = decimal2dms(np.array([ + (LAT[0] + LAT[1] / 60 + LAT[2] / 3600, LAT[3]), + (LON[0] + LON[1] / 60 + LON[2] / 3600, LON[3]) + ]), ndigits=4) + assert np.all(np.equal( + dms, + np.array((np.array(LAT, dtype=object), np.array(LON, dtype=object))) + )) + + def test_dms2decimal(self): + decimal = dms2decimal((-LAT[0], LAT[1], LAT[2], LAT[3])) # check it handles westerly + assert np.all(np.equal( + decimal, + np.array([ + -(LAT[0] + LAT[1] / 60 + LAT[2] / 3600), + LAT[3] + ], dtype=object) )) - )) + decimals = dms2decimal((LAT, LON)) + assert np.all(np.equal( + decimals, + np.array((dms2decimal(LAT), dms2decimal(LON))) + )) -def test_dms2decimal2dms(): - _dms = _generate_random_dms(int(1e3), 8) - decimal = dms2decimal(_dms) - dms = decimal2dms(decimal, 8) + decimals = dms2decimal(((LAT, LON), (LON, LAT))) + assert np.all(np.equal( + decimals, + np.array(( + (dms2decimal(LAT), dms2decimal(LON)), + (dms2decimal(LON), dms2decimal(LAT)) + )) + )) - assert np.all(np.equal(_dms, dms)) + def test_dms2decimal2dms(self): + _dms = _generate_random_dms(int(1e3), 8) + decimal = dms2decimal(_dms) + dms = decimal2dms(decimal, 8) + assert np.all(np.equal(_dms, dms)) -def one_function_to_run_them_all(): - """ - Function to gather the test functions so that they can be tested by - running this module. - https://stackoverflow.com/questions/18907712/python-get-list-of-all- - functions-in-current-module-inspecting-current-module - """ - test_functions = [ - obj for name, obj in inspect.getmembers(sys.modules[__name__]) - if (inspect.isfunction(obj) - and name.startswith('test') - and name != 'all') - ] +# def one_function_to_run_them_all(): +# """ +# Function to gather the test functions so that they can be tested by +# running this module. - for f in test_functions: - f() +# https://stackoverflow.com/questions/18907712/python-get-list-of-all- +# functions-in-current-module-inspecting-current-module +# """ +# test_functions = [ +# obj for name, obj in inspect.getmembers(sys.modules[__name__]) +# if (inspect.isfunction(obj) +# and name.startswith('test') +# and name != 'all') +# ] - pass +# for f in test_functions: +# f() + +# pass if __name__ == '__main__': - one_function_to_run_them_all() + unittest.main() + # one_function_to_run_them_all() diff --git a/welleng/version.py b/welleng/version.py index a62e53d..32a90a3 100644 --- a/welleng/version.py +++ b/welleng/version.py @@ -1 +1 @@ -__version__ = '0.7.8' +__version__ = '0.8.0' diff --git a/welleng/visual.py b/welleng/visual.py index 182bde4..8b2aa21 100644 --- a/welleng/visual.py +++ b/welleng/visual.py @@ -4,11 +4,17 @@ except ImportError: TRIMESH = False try: - from vedo import trimesh2vedo, Lines, Sphere + import vedo + from vedo import Lines, Mesh + from vedo import Plotter as VedoPlotter + from vedo import Point, Text2D, mag, plotter_instance + from vedo.addons import Icon, compute_visible_bounds + from vedo.utils import buildPolyData VEDO = True except ImportError: VEDO = False import numpy as np +from scipy.spatial import KDTree try: import plotly.graph_objects as go @@ -17,220 +23,226 @@ except ImportError: PLOTLY = False -from vtk import ( - vtkCubeAxesActor, vtkNamedColors, vtkInteractorStyleTerrain -) -from vtkmodules.vtkRenderingCore import ( - vtkRenderWindow, vtkRenderWindowInteractor, vtkRenderer -) -from vtkmodules.vtkInteractionWidgets import ( - vtkCameraOrientationWidget, - vtkOrientationMarkerWidget -) +from vtk import vtkAxesActor, vtkCubeAxesActor, vtkNamedColors +from . import mesh from .version import __version__ as VERSION # VEDO = False -class Plotter(vtkRenderer): - def __init__(self, data=None, **kwargs): - super().__init__() +class Plotter(VedoPlotter): + def __init__(self, *args, **kwargs): """ - A vtk wrapper for quickly visualizing well trajectories for QAQC - purposes. Initiates a vtkRenderer instance and populates it with - mesh data and a suitable axes for viewing well trajectory data. - - Parameters - ---------- - data: obj or list(obj) - A trimesh.Trimesh object or a list of trimesh.Trimesh - objects or a trimesh.scene object - names: list of strings (default: None) - A list of names, index aligned to the list of well meshes. - colors: list of strings (default: None) - A list of color or colors. If a single color is listed then this is - applied to all meshes in data, otherwise the list of colors is - indexed to the list of meshes. + Notes + ----- + On account of Z or TVD pointing down in the drilling world, the + coordinate system is right handed. In order to map coordinates in the + NEV (or North, East, Vertical) reference correctly, North coordinates + are plotted on the X axis and East on the Y axis. Be mindful of this + adding objects to a scene. """ - assert all((VEDO, TRIMESH)), \ - "ImportError: try pip install welleng[easy]" - - if data is not None: - names = kwargs.get('names') - - if isinstance(data, trimesh.scene.scene.Scene): - meshes = [v for k, v in data.geometry.items()] - if names is None: - names = list(data.geometry.keys()) - - # handle a single mesh being passed - elif isinstance(data, trimesh.Trimesh): - meshes = [data] - - else: - meshes = data - if names is not None: - assert len(names) == len(data), \ - "Names must be length of meshes list else None" - - colors = kwargs.get('colors') - if colors is not None: - if len(colors) == 1: - colors = colors * len(meshes) - else: - assert len(colors) == len(meshes), \ - "Colors must be length of meshes list, 1 else None" - - points = kwargs.get('points') - if points is not None: - points = [ - Sphere(p, r=30, c='grey') - for p in points - ] - - meshes_vedo = [] - for i, mesh in enumerate(meshes): - if i == 0: - vertices = np.array(mesh.vertices) - start_locations = np.array([mesh.vertices[0]]) - else: - vertices = np.concatenate( - (vertices, np.array(mesh.vertices)), - axis=0 - ) - start_locations = np.concatenate( - (start_locations, np.array([mesh.vertices[0]])), - axis=0 - ) - - # convert to vedo mesh - m_vedo = trimesh2vedo(mesh) - if colors is not None: - m_vedo.c(colors[i]) - if names is not None: - m_vedo.name = names[i] - # m_vedo.flag() - meshes_vedo.append(m_vedo) - - for mesh in meshes_vedo: - if mesh is None: - continue - self.AddActor(mesh) - - for obj in kwargs.values(): - if isinstance(obj, list): - for item in obj: - try: - self.AddActor(item) - except TypeError: - pass - else: - try: - self.AddActor(obj) - except TypeError: - pass - - self.namedColors = vtkNamedColors() - - self.colors = {} - self.colors['background'] = kwargs.get('background', 'LightGrey') - self.colors['background2'] = kwargs.get('background', 'Lavender') + super().__init__(*args, **kwargs) - def add_axes(self, **kwargs): - axes = CubeAxes(self, **kwargs) - self.AddActor(axes) + self.wells = [] - def get_center(self): - min, max = np.array(self.ComputeVisiblePropBounds()).reshape(-1, 3) - center = min + (max - min) / 2 - - return tuple(center) + pass - def show(self, add_axes=True, orientation_marker=False, **kwargs): + def add(self, obj, *args, **kwargs) -> None: + """Modified method to support direct plotting of + ``welleng.mesh.WellMesh`` instances and for processing the callback + to print well data when the pointer is hovered of a well trajectory. + + If the ``obj`` is a ``welleng.mesh.WellMesh`` instance, then the args + and kwargs will be passed to the `vedo.Mesh` instance to facilate e.g. + color options etc. + + Notes + ----- + ``welleng.mesh.WellMesh`` creates ``trimesh.Mesh`` instances, a legacy + of using the ``trimesh`` library for detecting mesh collisions when + developing automated well trajectory planning. Therefore, to visualize + the meshes with ``vedo`` and ``vtk``, the meshes need to be converted. + + Meshes in ``welleng`` typically reference an 'NEV' coordinate system, + which is [North, East, Vertical]. To map correctly to ``vtk``, North + needs to be mapped to X and East to Y on account of Z pointing down. """ - Convenient method for opening a window to view the rendered scene. - """ - if add_axes: - self.add_axes(**kwargs) - - self.GetActiveCamera().Azimuth(kwargs.get('azimuth', 30)) - self.GetActiveCamera().Elevation(kwargs.get('elevation', 30)) - self.GetActiveCamera().SetViewUp(0, 0, -1) - # self.GetActiveCamera().SetPosition(tuple(pos_new)) - self.GetActiveCamera().SetFocalPoint(self.get_center()) - - self.ResetCamera() - self.SetBackground( - self.namedColors.GetColor3d(self.colors['background']) - ) - self.SetBackground2( - self.namedColors.GetColor3d(self.colors['background2']) - ) - - setSize = kwargs.get('setSize', (1200, 900)) - - renderWindow = vtkRenderWindow() - - renderWindow.AddRenderer(self) - renderWindow.SetSize(*(setSize)) - renderWindow.SetWindowName(f'welleng {VERSION}') - - renderWindowInteractor = vtkRenderWindowInteractor() - renderWindowInteractor.SetRenderWindow(renderWindow) - renderWindowInteractor.SetInteractorStyle(vtkInteractorStyleTerrain()) - - renderWindow.Render() - self.GetActiveCamera().Zoom(0.8) + if isinstance(obj, mesh.WellMesh): + poly = buildPolyData(obj.mesh.vertices, obj.mesh.faces) + vedo_mesh = Mesh(poly, *args, *kwargs) + setattr(obj, 'vedo_mesh', vedo_mesh) + self.wells.append(obj) + super().add(obj.vedo_mesh) + else: + super().add(obj) - interactive = kwargs.get('interactive', True) + pass - if orientation_marker: - cow = vtkCameraOrientationWidget() - cow.SetParentRenderer(self) - # cow.SetDefaultRenderer(self) + def _initiate_axes_actor(self): + plt = vedo.plotter_instance + r = plt.renderers.index(plt.renderer) + + axact = vtkAxesActor() + axact.SetShaftTypeToCylinder() + axact.SetCylinderRadius(0.03) + axact.SetXAxisLabelText("N") + axact.SetYAxisLabelText("E") + axact.SetZAxisLabelText("V") + axact.GetXAxisShaftProperty().SetColor(1, 0, 0) + axact.GetYAxisShaftProperty().SetColor(0, 1, 0) + axact.GetZAxisShaftProperty().SetColor(0, 0, 1) + axact.GetXAxisTipProperty().SetColor(1, 0, 0) + axact.GetYAxisTipProperty().SetColor(0, 1, 0) + axact.GetZAxisTipProperty().SetColor(0, 0, 1) + bc = np.array(plt.renderer.GetBackground()) + if np.sum(bc) < 1.5: + lc = (1, 1, 1) + else: + lc = (0, 0, 0) + axact.GetXAxisCaptionActor2D().GetCaptionTextProperty().BoldOff() + axact.GetYAxisCaptionActor2D().GetCaptionTextProperty().BoldOff() + axact.GetZAxisCaptionActor2D().GetCaptionTextProperty().BoldOff() + axact.GetXAxisCaptionActor2D().GetCaptionTextProperty().ItalicOff() + axact.GetYAxisCaptionActor2D().GetCaptionTextProperty().ItalicOff() + axact.GetZAxisCaptionActor2D().GetCaptionTextProperty().ItalicOff() + axact.GetXAxisCaptionActor2D().GetCaptionTextProperty().ShadowOff() + axact.GetYAxisCaptionActor2D().GetCaptionTextProperty().ShadowOff() + axact.GetZAxisCaptionActor2D().GetCaptionTextProperty().ShadowOff() + axact.GetXAxisCaptionActor2D().GetCaptionTextProperty().SetColor(lc) + axact.GetYAxisCaptionActor2D().GetCaptionTextProperty().SetColor(lc) + axact.GetZAxisCaptionActor2D().GetCaptionTextProperty().SetColor(lc) + axact.PickableOff() + icn = Icon(axact, size=0.1) + plt.axes_instances[r] = icn + icn.SetInteractor(plt.interactor) + icn.EnabledOn() + icn.InteractiveOff() + plt.widgets.append(icn) + + def _pointer_callback(self, event): + i = event.at + pt2d = event.picked2d + objs = self.at(i).objects + pt3d = self.at(i).compute_world_coordinate(pt2d, objs=objs) + if mag(pt3d) < 0.01: + if self.pointer is None: + return + # if self.pointer in self.at(i).actors: + self.at(i).remove(self.pointer) + self.pointer = None + self.pointer_text.text('') + self.render() + return + if self.pointer is None: + self.pointer = Point().color('red').pos(pt3d) + else: + self.pointer.pos(pt3d) + self.at(i).add(self.pointer) - cow.Off() - cow.EnabledOn() + well_data = self._get_closest_well(pt3d, objs) + if well_data is None: + self.pointer_text.text(f'point coordinates: {np.round(pt3d, 3)}') + else: + survey = well_data.get('well').s + idx = well_data.get('idx_survey') + name = survey.header.name + md = survey.md[idx] + inc = survey.inc_deg[idx] + azi_grid = survey.azi_grid_deg[idx] + self.pointer_text.text(f''' + well name: {name}\n + md: {md:.2f}\t inc: {inc:.2f}\t azi: {azi_grid:.2f}\n + point coordinates: {np.round(pt3d, 3)} + ''') + self.render() + + def _well_vedo_meshes(self): + return [well.vedo_mesh for well in self.wells] + + def _get_closest_well(self, pos, objs) -> dict: + wells = [ + well for well in self.wells + if well.vedo_mesh in objs + ] + if not bool(wells): + return + + results = np.zeros((len(wells), 3)) + for i, well in enumerate(wells): + tree = KDTree(well.vertices.reshape(-1, 3)) + distance, idx_vertices = tree.query(pos) + results[i] = np.array([distance, idx_vertices, well.n_verts]) + + winner = np.argmin(results[:, 0]) + distance, idx_vertices, n_verts = results[winner] + + return { + 'well': wells[winner], + 'distance': distance, + 'idx_vertices': int(idx_vertices), + 'n_verts': int(n_verts), + 'idx_survey': int(idx_vertices // n_verts) + } + + def show(self, axes=None, *args, **kwargs): + # check if there's an axes and if so remove them + if self.axes is not None: + self.remove(self.axes) + + self._initiate_axes_actor() + + self.add_callback('mouse move', self._pointer_callback) + self.pointer_text = Text2D("", pos='bottom-right', s=0.5, c='black') + self.add(self.pointer_text) + self.pointer = None + + self = super().show( + viewup=[0, 0, -1], mode=8, + axes=CubeAxes() if axes is None else axes, + title=f'welleng {VERSION}', + *args, **kwargs + ) - if interactive: - renderWindowInteractor.Start() + return self class CubeAxes(vtkCubeAxesActor): - def __init__(self, renderer, **kwargs): + def __init__( + self, + **kwargs + ): super().__init__() - # Determine the bounds from the meshes/actors being plotted rounded - # up to the nearest 100 units. - bounds = np.array(renderer.ComputeVisiblePropBounds()) + # # Determine the bounds from the meshes/actors being plotted rounded + # # up to the nearest 100 units. + # bounds = np.array(renderer.ComputeVisiblePropBounds()) + + # with np.errstate(divide='ignore', invalid='ignore'): + # self.bounds = tuple(np.nan_to_num( + # ( + # np.ceil(np.abs(bounds) / 100) * 100 + # ) * (bounds / np.abs(bounds)) + # )) - with np.errstate(divide='ignore', invalid='ignore'): - self.bounds = tuple(np.nan_to_num( - ( - np.ceil(np.abs(bounds) / 100) * 100 - ) * (bounds / np.abs(bounds)) - )) + plt = vedo.plotter_instance + r = plt.renderers.index(plt.renderer) + vbb = compute_visible_bounds()[0] + self.SetBounds(vbb) + self.SetCamera(plt.renderer.GetActiveCamera()) namedColors = vtkNamedColors() self.colors = {} for n in range(1, 4): self.colors[f'axis{n}Color'] = namedColors.GetColor3d( - kwargs.get('axis1Color', 'DarkGrey') + kwargs.get(f'axis{n}Color', 'Black') ) - self.SetLabelScaling(0, 0, 0, 0) - - # Try and prevent scientific numbering on axes - self.SetXLabelFormat("%.0f") - self.SetYLabelFormat("%.0f") - self.SetZLabelFormat("%.0f") - - self.SetUseTextActor3D(1) + # self.SetUseTextActor3D(1) - self.SetBounds(self.bounds) - self.SetCamera(renderer.GetActiveCamera()) + # self.SetBounds(self.bounds) + # self.SetCamera(renderer.GetActiveCamera()) self.GetTitleTextProperty(0).SetColor(self.colors['axis1Color']) self.GetLabelTextProperty(0).SetColor(self.colors['axis1Color']) @@ -244,26 +256,45 @@ def __init__(self, renderer, **kwargs): self.GetLabelTextProperty(2).SetColor(self.colors['axis3Color']) self.GetLabelTextProperty(2).SetOrientation(45.0) - self.DrawXGridlinesOn() - self.DrawYGridlinesOn() - self.DrawZGridlinesOn() self.SetGridLineLocation(self.VTK_GRID_LINES_FURTHEST) + for a in ('X', 'Y', 'Z'): + getattr(self, f'Get{a}AxesLinesProperty')().SetColor( + namedColors.GetColor3d('Black') + ) + getattr(self, f'SetDraw{a}Gridlines')(1) + getattr(self, f'Get{a}AxesGridlinesProperty')().SetColor( + namedColors.GetColor3d('Grey') + ) + getattr(self, f'{a}AxisMinorTickVisibilityOff')() - self.XAxisMinorTickVisibilityOff() - self.YAxisMinorTickVisibilityOff() - self.ZAxisMinorTickVisibilityOff() + # self.DrawXGridlinesOn() + # self.DrawYGridlinesOn() + # self.DrawZGridlinesOn() + # self.SetGridLineLocation(self.VTK_GRID_LINES_FURTHEST) - units = kwargs.get('units', 'meters') - self.SetXTitle('East') + # self.XAxisMinorTickVisibilityOff() + # self.YAxisMinorTickVisibilityOff() + # self.ZAxisMinorTickVisibilityOff() + + units = kwargs.get('units', None) + self.SetXTitle('N') self.SetXUnits(units) - self.SetYTitle('North') + self.SetYTitle('E') self.SetYUnits(units) self.SetZTitle('TVD') self.SetZUnits(units) + # self.SetTickLocation(self.VTK_GRID_LINES_FURTHEST) + # self.ForceOpaqueOff() self.SetFlyModeToClosestTriad() + # Try and prevent scientific numbering on axes + self.SetLabelScaling(0, 0, 0, 0) + self.SetXLabelFormat("%.0f") + self.SetYLabelFormat("%.0f") + self.SetZLabelFormat("%.0f") - self.ForceOpaqueOff() + plt.axes_instances[r] = self + plt.renderer.AddActor(self) def plot(