From 3edd41d43de776c5de7f763ace1c20843d13c16e Mon Sep 17 00:00:00 2001 From: shilinlee <836160610@qq.com> Date: Sun, 28 Jan 2024 12:01:29 +0800 Subject: [PATCH] feat: support deploy monitoring server and valid topo configs (#61) Signed-off-by: shilinlee <836160610@qq.com> --- cmd/cluster/root.go | 4 + embed/examples/cluster/minimal.yaml | 139 ++-- embed/examples/cluster/topology.example.yaml | 147 ++-- embed/templates/config/ts-monitor.toml.tpl | 54 ++ embed/templates/dashboards/error_logs.json | 2 +- embed/templates/scripts/run_ts_server.sh.tpl | 12 + pkg/cluster/manager/basic.go | 68 +- pkg/cluster/manager/builder.go | 168 +++- pkg/cluster/manager/check.go | 25 +- pkg/cluster/manager/install.go | 55 +- pkg/cluster/operation/action.go | 148 +++- pkg/cluster/operation/uninstall.go | 55 +- pkg/cluster/spec/grafana.go | 36 +- pkg/cluster/spec/instance.go | 7 +- pkg/cluster/spec/monitoring.go | 225 ++++++ pkg/cluster/spec/parse_topology.go | 106 +-- pkg/cluster/spec/spec.go | 252 ++++-- pkg/cluster/spec/spec_manager.go | 43 +- pkg/cluster/spec/spec_test.go | 58 ++ pkg/cluster/spec/ts_meta.go | 39 +- pkg/cluster/spec/ts_monitor.go | 37 +- pkg/cluster/spec/ts_sql.go | 10 +- pkg/cluster/spec/ts_store.go | 2 +- pkg/cluster/spec/validate.go | 758 ++++++++++++++++++- pkg/cluster/task/builder.go | 16 + pkg/cluster/task/install_package.go | 2 +- pkg/cluster/task/monitored_config.go | 162 ++++ pkg/cluster/template/config/ts-monitor.go | 71 ++ pkg/cluster/template/scripts/ts_monitor.go | 29 +- pkg/cluster/template/scripts/ts_server.go | 51 ++ pkg/cluster/template/template.go | 21 + pkg/gui/cliutil.go | 29 + pkg/gui/gui.go | 2 +- pkg/meta/err.go | 71 ++ pkg/utils/utils.go | 10 + 35 files changed, 2568 insertions(+), 346 deletions(-) create mode 100644 embed/templates/config/ts-monitor.toml.tpl create mode 100644 embed/templates/scripts/run_ts_server.sh.tpl create mode 100644 pkg/cluster/spec/monitoring.go create mode 100644 pkg/cluster/spec/spec_test.go create mode 100644 pkg/cluster/task/monitored_config.go create mode 100644 pkg/cluster/template/config/ts-monitor.go create mode 100644 pkg/cluster/template/scripts/ts_server.go create mode 100644 pkg/cluster/template/template.go create mode 100644 pkg/meta/err.go diff --git a/cmd/cluster/root.go b/cmd/cluster/root.go index 93519e5..5bbb385 100644 --- a/cmd/cluster/root.go +++ b/cmd/cluster/root.go @@ -43,6 +43,10 @@ var cm *manager.Manager func init() { logger.InitGlobalLogger() + gui.AddColorFunctionsForCobra() + + cobra.EnableCommandSorting = false + ClusterCmd = &cobra.Command{ Use: "cluster", Short: "Deploy an openGemini cluster for production", diff --git a/embed/examples/cluster/minimal.yaml b/embed/examples/cluster/minimal.yaml index 4784f6f..7afa919 100644 --- a/embed/examples/cluster/minimal.yaml +++ b/embed/examples/cluster/minimal.yaml @@ -8,44 +8,41 @@ global: ### group is used to specify the group name the user belong to if it's not the same as user. # group: "root" ### Storage directory for cluster deployment files, startup scripts, and configuration files. - deploy_dir: "/var/lib/openGemini/deploy" - ### openGemini Cluster log file storage directory. - log_dir: "/var/lib/openGemini/logs" + deploy_dir: "/gemini-deploy" ### openGemini Cluster data storage directory - data_dir: "/gemini-data/meta" - # operating system, linux/darwin. + data_dir: "/gemini-data" + ### operating system, linux/darwin. os: "linux" - # Supported values: "amd64", "arm64" (default: "amd64"). + ### Supported values: "amd64", "arm64" (default: "amd64"). arch: "amd64" - # # Resource Control is used to limit the resource of an instance. - # # See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html - # # Supports using instance-level `resource_control` to override global `resource_control`. + ### Resource Control is used to limit the resource of an instance. + ### See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html + ### Supports using instance-level `resource_control` to override global `resource_control`. # resource_control: - # # See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#MemoryLimit=bytes - # memory_limit: "2G" - # # See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#CPUQuota= - # # The percentage specifies how much CPU time the unit shall get at maximum, relative to the total CPU time available on one CPU. Use values > 100% for allotting CPU time on more than one CPU. - # # Example: CPUQuota=200% ensures that the executed processes will never get more than two CPU time. - # cpu_quota: "200%" - # # See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#IOReadBandwidthMax=device%20bytes - # io_read_bandwidth_max: "/dev/disk/by-path/pci-0000:00:1f.2-scsi-0:0:0:0 100M" - # io_write_bandwidth_max: "/dev/disk/by-path/pci-0000:00:1f.2-scsi-0:0:0:0 100M" + ### See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#MemoryLimit=bytes + # memory_limit: "2G" + ### See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#CPUQuota= + ### The percentage specifies how much CPU time the unit shall get at maximum, relative to the total CPU time available on one CPU. Use values > 100% for allotting CPU time on more than one CPU. + ### Example: CPUQuota=200% ensures that the executed processes will never get more than two CPU time. + # cpu_quota: "200%" + ### See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#IOReadBandwidthMax=device%20bytes + # io_read_bandwidth_max: "/dev/disk/by-path/pci-0000:00:1f.2-scsi-0:0:0:0 100M" + # io_write_bandwidth_max: "/dev/disk/by-path/pci-0000:00:1f.2-scsi-0:0:0:0 100M" ### Monitored variables are applied to all the machines. monitored: ### Enable ts-monitor instance for all the machines ts_monitor_enabled: true ### Storage directory for deployment files, startup scripts, and configuration files of monitoring components. - # deploy_dir: "/var/lib/openGemini/deploy" + # deploy_dir: "/gemini-deploy/ts-monitor" ### Log storage directory of the ts-monitor component. - # log_dir: "/var/lib/openGemini/logs" + # log_dir: "/gemini-deploy/ts-monitor/logs" ### Server configs are used to specify the runtime configuration of openGemini components. ### All configuration items can be found in openGemini docs: ### - ts-meta: https://docs.opengemini.org/ ### - ts-sql: https://docs.opengemini.org/ ### - ts-store: https://docs.opengemini.org/ - ### - ts-monitor: https://docs.opengemini.org/ ### ### All configuration items use points to represent the hierarchy, e.g: ### common.ha-policy @@ -53,10 +50,9 @@ monitored: ### - example: https://github.com/openGemini/openGemini-UP/blob/main/embed/examples/cluster/topology.example.yaml ### You can overwrite this configuration via the instance-level `config` field. # server_configs: - # ts-meta: - # ts-sql: - # ts-store: - # ts-monitor: + # ts-meta: + # ts-sql: + # ts-store: # Server configs are used to specify the configuration of ts-meta Servers. ts_meta_servers: @@ -65,7 +61,7 @@ ts_meta_servers: ### SSH port of the server. # ssh_port: 22 ### Access the ts-meta cluster port. (for devops) - client_port: 8091 + # client_port: 8091 ### communication port among ts-meta Server nodes. # peer_port: 8092 ### communication raft port among ts-meta Server nodes. @@ -73,35 +69,35 @@ ts_meta_servers: ### communication gossip port among ts-meta and ts-store Server nodes. # gossip_port: 8010 ### ts-meta Server deployment file, startup script, configuration file storage directory. - deploy_dir: "/var/lib/openGemini/deploy" + # deploy_dir: "/gemini-deploy/ts-meta-8091" ### ts-meta Server logs storage directory. - log_dir: "/var/lib/openGemini/logs" + # log_dir: "/gemini-deploy/ts-meta-8091/logs" ### ts-meta Server meta data storage directory. - data_dir: "/var/lib/openGemini/meta" + # data_dir: "/gemini-data/ts-meta-8091" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.12 # ssh_port: 22 # client_port: 8091 # peer_port: 8092 # raft_port: 8088 # gossip_port: 8010 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" - data_dir: "/var/lib/openGemini/meta" + # deploy_dir: "/gemini-deploy/ts-meta-8091" + # log_dir: "/gemini-deploy/ts-meta-8091/logs" + # data_dir: "/gemini-data/ts-meta-8091" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.13 # ssh_port: 22 # client_port: 8091 # peer_port: 8092 # raft_port: 8088 # gossip_port: 8010 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" - data_dir: "/var/lib/openGemini/meta" + # deploy_dir: "/gemini-deploy/ts-meta-8091" + # log_dir: "/gemini-deploy/ts-meta-8091/logs" + # data_dir: "/gemini-data/ts-meta-8091" # config: - # logging.level: warning + # logging.level: warn ### Server configs are used to specify the configuration of ts-sql Servers. ts_sql_servers: @@ -112,25 +108,25 @@ ts_sql_servers: ### Access the openGemini cluster port. # port: 8086 ### ts-sql Server deployment file, startup script, configuration file storage directory. - deploy_dir: "/var/lib/openGemini/deploy" + # deploy_dir: "/gemini-deploy/ts-sql-8086" ### ts-sql Server logs storage directory. - log_dir: "/var/lib/openGemini/logs" + # log_dir: "/gemini-deploy/ts-sql-8086/logs" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.15 # ssh_port: 22 # port: 8086 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" + # deploy_dir: "/gemini-deploy/ts-sql-8086" + # log_dir: "/gemini-deploy/ts-sql-8086/logs" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.16 # ssh_port: 22 # port: 8086 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" + # deploy_dir: "/gemini-deploy/ts-sql-8086" + # log_dir: "/gemini-deploy/ts-sql-8086/logs" # config: - # logging.level: warning + # logging.level: warn ### Server configs are used to specify the configuration of ts-store Servers. ts_store_servers: @@ -145,33 +141,54 @@ ts_store_servers: ### communication gossip port among ts-meta and ts-store Server nodes. # gossip_port: 8011 ### ts-store Server deployment file, startup script, configuration file storage directory. - deploy_dir: "/var/lib/openGemini/deploy" + # deploy_dir: "/gemini-deploy/ts-store-8401" ### ts-store Server logs storage directory. - log_dir: "/var/lib/openGemini/logs" + # log_dir: "/gemini-deploy/ts-store-8401/logs" ### ts-store Server meta data storage directory. - data_dir: "/var/lib/openGemini/data" + # data_dir: "/gemini-data/ts-store-8401" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.15 # ssh_port: 22 # ingest_port: 8400 # select_port: 8401 # gossip_port: 8011 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" - data_dir: "/var/lib/openGemini/data" + # deploy_dir: "/gemini-deploy/ts-store-8401" + # log_dir: "/gemini-deploy/ts-store-8401/logs" + # data_dir: "/gemini-data/ts-store-8401" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.16 # ssh_port: 22 # ingest_port: 8400 # select_port: 8401 # gossip_port: 8011 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" - data_dir: "/var/lib/openGemini/data" + # deploy_dir: "/gemini-deploy/ts-store-8401" + # log_dir: "/gemini-deploy/ts-store-8401/logs" + # data_dir: "/gemini-data/ts-store-8401" # config: - # logging.level: warning + # logging.level: warn + +### Server configs are used to specify the configuration of ts-server Server. +monitoring_servers: + ### The ip address of the Monitoring Server. + - host: 10.0.1.17 + ### SSH port of the server. + # ssh_port: 22 + ### ts-server Service communication port. + # port: 8186 + ### Access the ts-meta cluster port. (for devops) + # client_port: 8191 + # peer_port: 8192 + # raft_port: 8188 + # ingest_port: 8410 + # select_port: 8411 + ### ts-server deployment file, startup script, configuration file storage directory. + # deploy_dir: "/gemini-deploy/ts-server-8186" + ### ts-server log file storage directory. + # log_dir: "/gemini-deploy/ts-server-8186/logs" + ### ts-server data storage directory. + # data_dir: "/gemini-data/ts-server-8186" ### Server configs are used to specify the configuration of Grafana Servers. grafana_servers: @@ -180,8 +197,8 @@ grafana_servers: ### Grafana Web monitoring service client (browser) access port # port: 3000 ### Grafana deployment file, startup script, configuration file storage directory. - # deploy_dir: /var/lib/openGemini/deploy/grafana-3000 + # deploy_dir: /gemini-deploy/grafana-3000 ### grafana dashboard dir on gemix machine # dashboard_dir: /home/gemini/dashboards # config: - # log.file.level: warning \ No newline at end of file + # log.file.level: warning diff --git a/embed/examples/cluster/topology.example.yaml b/embed/examples/cluster/topology.example.yaml index 4784f6f..4b3f6a5 100644 --- a/embed/examples/cluster/topology.example.yaml +++ b/embed/examples/cluster/topology.example.yaml @@ -8,44 +8,41 @@ global: ### group is used to specify the group name the user belong to if it's not the same as user. # group: "root" ### Storage directory for cluster deployment files, startup scripts, and configuration files. - deploy_dir: "/var/lib/openGemini/deploy" - ### openGemini Cluster log file storage directory. - log_dir: "/var/lib/openGemini/logs" + deploy_dir: "/gemini-deploy" ### openGemini Cluster data storage directory - data_dir: "/gemini-data/meta" - # operating system, linux/darwin. + data_dir: "/gemini-data" + ### operating system, linux/darwin. os: "linux" - # Supported values: "amd64", "arm64" (default: "amd64"). + ### Supported values: "amd64", "arm64" (default: "amd64"). arch: "amd64" - # # Resource Control is used to limit the resource of an instance. - # # See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html - # # Supports using instance-level `resource_control` to override global `resource_control`. + ### Resource Control is used to limit the resource of an instance. + ### See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html + ### Supports using instance-level `resource_control` to override global `resource_control`. # resource_control: - # # See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#MemoryLimit=bytes - # memory_limit: "2G" - # # See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#CPUQuota= - # # The percentage specifies how much CPU time the unit shall get at maximum, relative to the total CPU time available on one CPU. Use values > 100% for allotting CPU time on more than one CPU. - # # Example: CPUQuota=200% ensures that the executed processes will never get more than two CPU time. - # cpu_quota: "200%" - # # See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#IOReadBandwidthMax=device%20bytes - # io_read_bandwidth_max: "/dev/disk/by-path/pci-0000:00:1f.2-scsi-0:0:0:0 100M" - # io_write_bandwidth_max: "/dev/disk/by-path/pci-0000:00:1f.2-scsi-0:0:0:0 100M" + ### See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#MemoryLimit=bytes + # memory_limit: "2G" + ### See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#CPUQuota= + ### The percentage specifies how much CPU time the unit shall get at maximum, relative to the total CPU time available on one CPU. Use values > 100% for allotting CPU time on more than one CPU. + ### Example: CPUQuota=200% ensures that the executed processes will never get more than two CPU time. + # cpu_quota: "200%" + ### See: https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#IOReadBandwidthMax=device%20bytes + # io_read_bandwidth_max: "/dev/disk/by-path/pci-0000:00:1f.2-scsi-0:0:0:0 100M" + # io_write_bandwidth_max: "/dev/disk/by-path/pci-0000:00:1f.2-scsi-0:0:0:0 100M" ### Monitored variables are applied to all the machines. monitored: ### Enable ts-monitor instance for all the machines ts_monitor_enabled: true ### Storage directory for deployment files, startup scripts, and configuration files of monitoring components. - # deploy_dir: "/var/lib/openGemini/deploy" + # deploy_dir: "/gemini-deploy/ts-monitor" ### Log storage directory of the ts-monitor component. - # log_dir: "/var/lib/openGemini/logs" + # log_dir: "/gemini-deploy/ts-monitor/logs" ### Server configs are used to specify the runtime configuration of openGemini components. ### All configuration items can be found in openGemini docs: ### - ts-meta: https://docs.opengemini.org/ ### - ts-sql: https://docs.opengemini.org/ ### - ts-store: https://docs.opengemini.org/ - ### - ts-monitor: https://docs.opengemini.org/ ### ### All configuration items use points to represent the hierarchy, e.g: ### common.ha-policy @@ -53,10 +50,9 @@ monitored: ### - example: https://github.com/openGemini/openGemini-UP/blob/main/embed/examples/cluster/topology.example.yaml ### You can overwrite this configuration via the instance-level `config` field. # server_configs: - # ts-meta: - # ts-sql: - # ts-store: - # ts-monitor: + # ts-meta: + # ts-sql: + # ts-store: # Server configs are used to specify the configuration of ts-meta Servers. ts_meta_servers: @@ -73,35 +69,35 @@ ts_meta_servers: ### communication gossip port among ts-meta and ts-store Server nodes. # gossip_port: 8010 ### ts-meta Server deployment file, startup script, configuration file storage directory. - deploy_dir: "/var/lib/openGemini/deploy" + deploy_dir: "/gemini-deploy/ts-meta-8091" ### ts-meta Server logs storage directory. - log_dir: "/var/lib/openGemini/logs" + log_dir: "/gemini-deploy/ts-meta-8091/logs" ### ts-meta Server meta data storage directory. - data_dir: "/var/lib/openGemini/meta" + data_dir: "/gemini-data/ts-meta-8091" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.12 # ssh_port: 22 - # client_port: 8091 + client_port: 8091 # peer_port: 8092 # raft_port: 8088 # gossip_port: 8010 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" - data_dir: "/var/lib/openGemini/meta" + deploy_dir: "/gemini-deploy/ts-meta-8091" + log_dir: "/gemini-deploy/ts-meta-8091/logs" + data_dir: "/gemini-data/ts-meta-8091" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.13 # ssh_port: 22 - # client_port: 8091 + client_port: 8091 # peer_port: 8092 # raft_port: 8088 # gossip_port: 8010 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" - data_dir: "/var/lib/openGemini/meta" + deploy_dir: "/gemini-deploy/ts-meta-8091" + log_dir: "/gemini-deploy/ts-meta-8091/logs" + data_dir: "/gemini-data/ts-meta-8091" # config: - # logging.level: warning + # logging.level: warn ### Server configs are used to specify the configuration of ts-sql Servers. ts_sql_servers: @@ -110,27 +106,27 @@ ts_sql_servers: ### SSH port of the server. # ssh_port: 22 ### Access the openGemini cluster port. - # port: 8086 + port: 8086 ### ts-sql Server deployment file, startup script, configuration file storage directory. - deploy_dir: "/var/lib/openGemini/deploy" + deploy_dir: "/gemini-deploy/ts-sql-8086" ### ts-sql Server logs storage directory. - log_dir: "/var/lib/openGemini/logs" + log_dir: "/gemini-deploy/ts-sql-8086/logs" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.15 # ssh_port: 22 - # port: 8086 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" + port: 8086 + deploy_dir: "/gemini-deploy/ts-sql-8086" + log_dir: "/gemini-deploy/ts-sql-8086/logs" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.16 # ssh_port: 22 - # port: 8086 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" + port: 8086 + deploy_dir: "/gemini-deploy/ts-sql-8086" + log_dir: "/gemini-deploy/ts-sql-8086/logs" # config: - # logging.level: warning + # logging.level: warn ### Server configs are used to specify the configuration of ts-store Servers. ts_store_servers: @@ -145,33 +141,54 @@ ts_store_servers: ### communication gossip port among ts-meta and ts-store Server nodes. # gossip_port: 8011 ### ts-store Server deployment file, startup script, configuration file storage directory. - deploy_dir: "/var/lib/openGemini/deploy" + deploy_dir: "/gemini-deploy/ts-store-8401" ### ts-store Server logs storage directory. - log_dir: "/var/lib/openGemini/logs" + log_dir: "/gemini-deploy/ts-store-8401/logs" ### ts-store Server meta data storage directory. - data_dir: "/var/lib/openGemini/data" + data_dir: "/gemini-data/ts-store-8401" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.15 # ssh_port: 22 # ingest_port: 8400 # select_port: 8401 # gossip_port: 8011 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" - data_dir: "/var/lib/openGemini/data" + deploy_dir: "/gemini-deploy/ts-store-8401" + log_dir: "/gemini-deploy/ts-store-8401/logs" + data_dir: "/gemini-data/ts-store-8401" # config: - # logging.level: warning + # logging.level: warn - host: 10.0.1.16 # ssh_port: 22 # ingest_port: 8400 # select_port: 8401 # gossip_port: 8011 - deploy_dir: "/var/lib/openGemini/deploy" - log_dir: "/var/lib/openGemini/logs" - data_dir: "/var/lib/openGemini/data" + deploy_dir: "/gemini-deploy/ts-store-8401" + log_dir: "/gemini-deploy/ts-store-8401/logs" + data_dir: "/gemini-data/ts-store-8401" # config: - # logging.level: warning + # logging.level: warn + +### Server configs are used to specify the configuration of ts-server Server. +monitoring_servers: + ### The ip address of the Monitoring Server. + - host: 10.0.1.17 + ### SSH port of the server. + # ssh_port: 22 + ### ts-server Service communication port. + # port: 8186 + ### Access the ts-meta cluster port. (for devops) + # client_port: 8191 + # peer_port: 8192 + # raft_port: 8188 + # ingest_port: 8410 + # select_port: 8411 + ### ts-server deployment file, startup script, configuration file storage directory. + # deploy_dir: "/gemini-deploy/ts-server-8186" + ### ts-server log file storage directory. + # log_dir: "/gemini-deploy/ts-server-8186/logs" + ### ts-server data storage directory. + # data_dir: "/gemini-data/ts-server-8186" ### Server configs are used to specify the configuration of Grafana Servers. grafana_servers: @@ -180,8 +197,8 @@ grafana_servers: ### Grafana Web monitoring service client (browser) access port # port: 3000 ### Grafana deployment file, startup script, configuration file storage directory. - # deploy_dir: /var/lib/openGemini/deploy/grafana-3000 + # deploy_dir: /gemini-deploy/grafana-3000 ### grafana dashboard dir on gemix machine # dashboard_dir: /home/gemini/dashboards # config: - # log.file.level: warning \ No newline at end of file + # log.file.level: warning diff --git a/embed/templates/config/ts-monitor.toml.tpl b/embed/templates/config/ts-monitor.toml.tpl new file mode 100644 index 0000000..be75f82 --- /dev/null +++ b/embed/templates/config/ts-monitor.toml.tpl @@ -0,0 +1,54 @@ +[monitor] + # localhost ip + host = "{{.Host}}" + # Indicates the path of the metric file generated by the kernel. References openGemini.conf: [monitor] store-path + # metric-path cannot have subdirectories + metric-path = "{{.MetricPath}}" + # Indicates the path of the log file generated by the kernel. References openGemini.conf: [logging] path + # error-log-path cannot have subdirectories + error-log-path = "{{.ErrorLogPath}}" + # Data disk path. References openGemini.conf: [data] store-data-dir + disk-path = "{{.DataPath}}" + # Wal disk path. References openGemini.conf: [data] store-wal-dir + aux-disk-path = "{{.WALPath}}" + # Name of the process to be monitored. Optional Value: ts-store,ts-sql,ts-meta. + # Determined based on the actual process running on the local node. + process = "{{.ProcessName}}" + # the history file reported error-log names. + history-file = "history.txt" + # Is the metric compressed. + compress = false + +[query] + # query for some DDL. Report for these data to monitor cluster. + # - SHOW DATABASES + # - SHOW MEASUREMENTS + # - SHOW SERIES CARDINALITY FROM mst + query-enable = false + http-endpoint = "127.0.0.x:8086" + query-interval = "5m" + # username = "" + # password = "" + # https-enable = false + +[report] + # Address for metric data to be reported. + address = "{{.MonitorAddr}}" + # Database name for metric data to be reported. + database = "{{.MonitorDB}}" + rp = "autogen" + rp-duration = "168h" + # username = "" + # password = "" +{{- if .TLSEnabled}} + https-enable = true +{{- end}} + +[logging] + format = "auto" + level = "info" + path = "{{.LoggingPath}}" + max-size = "64m" + max-num = 30 + max-age = 7 + compress-enabled = true diff --git a/embed/templates/dashboards/error_logs.json b/embed/templates/dashboards/error_logs.json index f79fbf4..898c6b2 100644 --- a/embed/templates/dashboards/error_logs.json +++ b/embed/templates/dashboards/error_logs.json @@ -483,5 +483,5 @@ "timezone": "browser", "title": "Test-Cluster-Error-Logs", "uid": "n57uE1Z4k", - "version": 19 + "version": 1 } \ No newline at end of file diff --git a/embed/templates/scripts/run_ts_server.sh.tpl b/embed/templates/scripts/run_ts_server.sh.tpl new file mode 100644 index 0000000..1391bca --- /dev/null +++ b/embed/templates/scripts/run_ts_server.sh.tpl @@ -0,0 +1,12 @@ +#!/bin/bash +set -e + +# WARNING: This file was auto-generated. Do not edit! +# All your edit might be overwritten! +DEPLOY_DIR={{.DeployDir}} + +cd "${DEPLOY_DIR}" || exit 1 + +exec env GODEBUG=madvdontneed=1 bin/ts-server \ + --config=conf/ts-server.toml \ + >> "{{.LogDir}}/server_extra.log" diff --git a/pkg/cluster/manager/basic.go b/pkg/cluster/manager/basic.go index 48efb50..e3e3965 100644 --- a/pkg/cluster/manager/basic.go +++ b/pkg/cluster/manager/basic.go @@ -14,27 +14,71 @@ package manager -import "github.com/openGemini/gemix/pkg/cluster/spec" +import ( + "path/filepath" -type hostInfo struct { - ssh int // ssh port of host - os string // operating system - arch string // cpu architecture -} + "github.com/openGemini/gemix/pkg/cluster/spec" + "github.com/openGemini/gemix/pkg/set" + "github.com/openGemini/gemix/pkg/utils" +) // getAllUniqueHosts gets all the instance -func getAllUniqueHosts(topo spec.Topology) map[string]hostInfo { +func getAllUniqueHosts(topo spec.Topology) map[string]spec.MonitorHostInfo { // monitor - uniqueHosts := make(map[string]hostInfo) // host -> ssh-port, os, arch + uniqueHosts := make(map[string]spec.MonitorHostInfo) // host -> ssh-port, os, arch topo.IterInstance(func(inst spec.Instance) { if _, found := uniqueHosts[inst.GetManageHost()]; !found { - uniqueHosts[inst.GetManageHost()] = hostInfo{ - ssh: inst.GetSSHPort(), - os: inst.OS(), - arch: inst.Arch(), + uniqueHosts[inst.GetManageHost()] = spec.MonitorHostInfo{ + Ssh: inst.GetSSHPort(), + Os: inst.OS(), + Arch: inst.Arch(), } } }) return uniqueHosts } + +// getMonitorHosts get the ts-monitor instances +func getMonitorHosts(topo spec.Topology) (map[string]*spec.MonitorHostInfo, set.StringSet) { + // monitor + uniqueHosts := make(map[string]*spec.MonitorHostInfo) // host -> ssh-port, os, arch + noAgentHosts := set.NewStringSet() + topo.IterInstance(func(inst spec.Instance) { + // add the instance to ignore list if it marks itself as ignore_exporter + //if inst.IgnoreMonitorAgent() { + // noAgentHosts.Insert(inst.GetManageHost()) + //} + + if h, found := uniqueHosts[inst.GetManageHost()]; !found { + uniqueHosts[inst.GetManageHost()] = &spec.MonitorHostInfo{ + Ssh: inst.GetSSHPort(), + Os: inst.OS(), + Arch: inst.Arch(), + MetricPath: filepath.Join(inst.LogDir(), "metric"), + ErrorLogPath: inst.LogDir(), + ProcessName: set.NewStringSet(), + } + } else { + switch inst.ComponentName() { + case spec.ComponentTSMeta, + spec.ComponentTSStore, + spec.ComponentTSSql: + h.ProcessName.Insert(inst.ComponentName()) + default: + } + } + h := uniqueHosts[inst.GetManageHost()] + if inst.ComponentName() == spec.ComponentTSStore { + h.DataPath = inst.DataDir() + h.WALPath = filepath.Join(inst.DataDir(), "wal") + } + + if inst.ComponentName() == spec.ComponentTSServer { + h.MonitorAddr = utils.JoinHostPort(inst.GetHost(), inst.GetPort()) + } + + }) + + return uniqueHosts, noAgentHosts +} diff --git a/pkg/cluster/manager/builder.go b/pkg/cluster/manager/builder.go index e467178..3bd5807 100644 --- a/pkg/cluster/manager/builder.go +++ b/pkg/cluster/manager/builder.go @@ -50,7 +50,7 @@ func buildEnvInitTasks(topo spec.Topology, opt *InstallOptions, gOpt *operator.O t := task.NewBuilder(logger). RootSSH( host, - info.ssh, + info.Ssh, opt.User, sshConnProps.Password, sshConnProps.IdentityFile, @@ -61,13 +61,177 @@ func buildEnvInitTasks(topo spec.Topology, opt *InstallOptions, gOpt *operator.O UserAction(host, globalOptions.User, globalOptions.Group, opt.SkipCreateUser || globalOptions.User == opt.User). EnvInit(host, globalOptions.User, globalOptions.Group). Mkdir(globalOptions.User, host, dirs...). - BuildAsStep(fmt.Sprintf(" - Prepare %s:%d", host, info.ssh)) + BuildAsStep(fmt.Sprintf(" - Prepare %s:%d", host, info.Ssh)) envInitTasks = append(envInitTasks, t) } return envInitTasks } +func buildMonitoredDeployTask( + m *Manager, + clusterVersion string, + uniqueHosts map[string]*spec.MonitorHostInfo, // host -> ssh-port, os, arch + noAgentHosts set.StringSet, // hosts that do not deploy monitor agents + globalOptions *spec.GlobalOptions, + monitoredOptions *spec.TSMonitoredOptions, + gOpt operator.Options, + p *gui.SSHConnectionProps, +) (downloadCompTasks []*task.StepDisplay, deployCompTasks []*task.StepDisplay, err error) { + if monitoredOptions == nil || !monitoredOptions.TSMonitorEnabled { + return + } + + uniqueCompOSArch := set.NewStringSet() + // monitoring agents + for _, comp := range []string{spec.ComponentOpenGemini} { + for host, info := range uniqueHosts { + // skip deploying monitoring agents if the instance is marked so + if noAgentHosts.Exist(host) { + continue + } + + // populate unique comp-os-arch set + key := fmt.Sprintf("%s-%s-%s", comp, info.Os, info.Arch) + if found := uniqueCompOSArch.Exist(key); !found { + uniqueCompOSArch.Insert(key) + downloadCompTasks = append(downloadCompTasks, task.NewBuilder(m.logger). + Download(comp, info.Os, info.Arch, clusterVersion). + BuildAsStep(fmt.Sprintf(" - Download %s:%s (%s/%s)", comp, clusterVersion, info.Os, info.Arch))) + } + + deployDir := spec.Abs(globalOptions.User, monitoredOptions.DeployDir) + // log dir will always be with values, but might not be used by the component + logDir := spec.Abs(globalOptions.User, monitoredOptions.LogDir) + + deployDirs := []string{ + deployDir, + logDir, + filepath.Join(deployDir, "bin"), + filepath.Join(deployDir, "conf"), + filepath.Join(deployDir, "scripts"), + } + + // Deploy component + t := task.NewBuilder(m.logger). // TODO: only support root deploy user + RootSSH( + host, + info.Ssh, + globalOptions.User, + p.Password, + p.IdentityFile, + p.IdentityFilePassphrase, + gOpt.SSHTimeout, + gOpt.OptTimeout, + ). + //t := task.NewSimpleUerSSH(m.logger, inst.GetManageHost(), inst.GetSSHPort(), globalOptions.User, 0, 0). + Mkdir(globalOptions.User, host, deployDirs...). + CopyComponent( + comp, + spec.ComponentTSMonitor, + info.Os, + info.Arch, + clusterVersion, + "", // use default srcPath + host, + deployDir, + ) + //tb := task.NewSimpleUerSSH(m.logger, host, info.ssh, globalOptions.User, gOpt, p). + // Mkdir(globalOptions.User, host, deployDirs...). + // CopyComponent( + // comp, + // info.os, + // info.arch, + // version, + // "", + // host, + // deployDir, + // ) + deployCompTasks = append(deployCompTasks, t.BuildAsStep(fmt.Sprintf(" - Deploy %s -> %s", comp, host))) + } + } + return +} + +func buildInitMonitoredConfigTasks( + specManager *spec.SpecManager, + clusterName string, + uniqueHosts map[string]*spec.MonitorHostInfo, // host -> ssh-port, os, arch + noAgentHosts set.StringSet, + globalOptions spec.GlobalOptions, + monitoredOptions *spec.TSMonitoredOptions, + logger *logprinter.Logger, + sshTimeout, exeTimeout uint64, + gOpt operator.Options, + p *gui.SSHConnectionProps, +) []*task.StepDisplay { + if monitoredOptions == nil || !monitoredOptions.TSMonitorEnabled { + return nil + } + + var tasks []*task.StepDisplay + // monitoring agents + for _, comp := range []string{spec.ComponentTSMonitor} { + for host, info := range uniqueHosts { + if noAgentHosts.Exist(host) { + continue + } + + deployDir := spec.Abs(globalOptions.User, monitoredOptions.DeployDir) + // log dir will always be with values, but might not used by the component + logDir := spec.Abs(globalOptions.User, monitoredOptions.LogDir) + // Generate configs + + t := task.NewBuilder(logger). // TODO: only support root deploy user + RootSSH( + host, + info.Ssh, + globalOptions.User, + p.Password, + p.IdentityFile, + p.IdentityFilePassphrase, + gOpt.SSHTimeout, + gOpt.OptTimeout, + ). + MonitoredConfig( + clusterName, + comp, + host, + info, + globalOptions.ResourceControl, + monitoredOptions, + globalOptions.User, + globalOptions.TLSEnabled, + meta.DirPaths{ + Deploy: deployDir, + Log: logDir, + Cache: specManager.Path(clusterName, spec.TempConfigPath), + }, + ).BuildAsStep(fmt.Sprintf(" - Generate config %s -> %s", comp, host)) + + //t := task.NewSimpleUerSSH(logger, host, info.ssh, globalOptions.User, gOpt, p, globalOptions.SSHType). + // MonitoredConfig( + // clusterName, + // comp, + // host, + // globalOptions.ResourceControl, + // monitoredOptions, + // globalOptions.User, + // globalOptions.TLSEnabled, + // meta.DirPaths{ + // Deploy: deployDir, + // Data: []string{dataDir}, + // Log: logDir, + // Cache: specManager.Path(clusterName, spec.TempConfigPath), + // }, + // ). + + tasks = append(tasks, t) + } + } + return tasks +} + // buildDownloadCompTasks build download component tasks func buildDownloadCompTasks(clusterVersion string, topo spec.Topology, logger *logprinter.Logger) []*task.StepDisplay { var tasks []*task.StepDisplay diff --git a/pkg/cluster/manager/check.go b/pkg/cluster/manager/check.go index e85791c..1df0057 100644 --- a/pkg/cluster/manager/check.go +++ b/pkg/cluster/manager/check.go @@ -14,18 +14,21 @@ package manager -import "github.com/openGemini/gemix/pkg/cluster/spec" +import ( + "github.com/openGemini/gemix/pkg/cluster/spec" + "github.com/pkg/errors" +) // checkConflict checks cluster conflict func checkConflict(m *Manager, clusterName string, topo spec.Topology) error { - //clusterList, err := m.specManager.GetAllClusters() - //if err != nil { - // return err - //} - //// use a dummy cluster name, the real cluster name is set during deploy - //if err := spec.CheckClusterPortConflict(clusterList, clusterName, topo); err != nil { - // return err - //} - //err = spec.CheckClusterDirConflict(clusterList, clusterName, topo) - return nil + clusterList, err := m.specManager.GetAllClusters() + if err != nil { + return errors.WithStack(err) + } + // use a dummy cluster name, the real cluster name is set during deploy + if err = spec.CheckClusterPortConflict(clusterList, clusterName, topo); err != nil { + return errors.WithStack(err) + } + err = spec.CheckClusterDirConflict(clusterList, clusterName, topo) + return errors.WithStack(err) } diff --git a/pkg/cluster/manager/install.go b/pkg/cluster/manager/install.go index 2cee032..bf48441 100644 --- a/pkg/cluster/manager/install.go +++ b/pkg/cluster/manager/install.go @@ -90,6 +90,22 @@ func (m *Manager) Install( return errors.WithStack(err) } + instCnt := 0 + topo.IterInstance(func(inst spec.Instance) { + switch inst.ComponentName() { + // monitoring components are only useful when deployed with + // core components, we do not support deploying any bare + // monitoring system. + case spec.ComponentGrafana, + spec.ComponentTSServer: + return + } + instCnt++ + }) + if instCnt < 1 { + return fmt.Errorf("no valid instance found in the input topology, please check your config") + } + spec.ExpandRelativeDir(topo) if err = checkConflict(m, clusterName, topo); err != nil { @@ -152,6 +168,39 @@ func (m *Manager) Install( refreshConfigTasks := buildInitConfigTasks(m, clusterName, topo, metadata.GetBaseMeta(), gOpt) + uniqueHosts, noAgentHosts := getMonitorHosts(topo) + + // Deploy monitor relevant components to remote + dlTasks, dpTasks, err := buildMonitoredDeployTask( + m, + clusterVersion, + uniqueHosts, + noAgentHosts, + globalOptions, + topo.GetMonitoredOptions(), + gOpt, + sshConnProps, + ) + if err != nil { + return err + } + downloadCompTasks = append(downloadCompTasks, dlTasks...) + deployCompTasks = append(deployCompTasks, dpTasks...) + + monitorConfigTasks := buildInitMonitoredConfigTasks( + m.specManager, + clusterName, + uniqueHosts, + noAgentHosts, + *topo.BaseTopo().GlobalOptions, + topo.GetMonitoredOptions(), + m.logger, + gOpt.SSHTimeout, + gOpt.OptTimeout, + gOpt, + sshConnProps, + ) + builder := task.NewBuilder(m.logger). Step("+ Generate SSH keys", task.NewBuilder(m.logger). @@ -163,8 +212,8 @@ func (m *Manager) Install( ParallelStep("+ Mkdir at target hosts", false, mkdirTasks...). ParallelStep("+ Deploy openGemini instance", false, deployCompTasks...). //ParallelStep("+ Copy certificate to remote host", gOpt.Force, certificateTasks...). - ParallelStep("+ Init instance configs", gOpt.Force, refreshConfigTasks...) - //ParallelStep("+ Init monitor configs", gOpt.Force, monitorConfigTasks...) + ParallelStep("+ Init instance configs", gOpt.Force, refreshConfigTasks...). + ParallelStep("+ Init monitor configs", gOpt.Force, monitorConfigTasks...) t := builder.Build() @@ -187,7 +236,7 @@ func (m *Manager) Install( return err } - hint := color.New(color.FgBlue).Sprintf("%s start %s", "gemix cluster", clusterName) + hint := color.New(color.FgHiBlue).Sprintf("%s start %s", "gemix cluster", clusterName) m.logger.Infof("Cluster `%s` installed successfully, you can start it with command: `%s`\n", clusterName, hint) return nil } diff --git a/pkg/cluster/operation/action.go b/pkg/cluster/operation/action.go index 254d066..ab24a5a 100644 --- a/pkg/cluster/operation/action.go +++ b/pkg/cluster/operation/action.go @@ -59,6 +59,8 @@ func Start( nodeFilter := set.NewStringSet(options.Nodes...) components := cluster.ComponentsByStartOrder() components = FilterComponent(components, roleFilter) + TSMonitoredOptions := cluster.GetMonitoredOptions() + noAgentHosts := set.NewStringSet() for _, comp := range components { insts := FilterInstance(comp.Instances(), nodeFilter) @@ -69,6 +71,9 @@ func Start( errg, _ := errgroup.WithContext(ctx) for _, inst := range insts { + //if !inst.IgnoreMonitorAgent() { + // uniqueHosts.Insert(inst.GetManageHost()) + //} uniqueHosts.Insert(inst.GetManageHost()) } if err = errg.Wait(); err != nil { @@ -76,12 +81,15 @@ func Start( } } - //hosts := make([]string, 0, len(uniqueHosts)) - //for host := range uniqueHosts { - // hosts = append(hosts, host) - //} - //return StartMonitored(ctx, hosts, noAgentHosts, monitoredOptions, options.OptTimeout) - return nil + if TSMonitoredOptions == nil || !TSMonitoredOptions.TSMonitorEnabled { + return nil + } + + hosts := make([]string, 0, len(uniqueHosts)) + for host := range uniqueHosts { + hosts = append(hosts, host) + } + return StartMonitored(ctx, hosts, noAgentHosts, options.OptTimeout) } // Stop the cluster. @@ -94,6 +102,8 @@ func Stop( nodeFilter := set.NewStringSet(options.Nodes...) components := cluster.ComponentsByStopOrder() components = FilterComponent(components, roleFilter) + monitoredOptions := cluster.GetMonitoredOptions() + noAgentHosts := set.NewStringSet() instCount := map[string]int{} cluster.IterInstance(func(inst spec.Instance) { @@ -111,7 +121,133 @@ func Stop( if err != nil && !options.Force { return errors.WithMessagef(err, "failed to stop %s", comp.Name()) } + for _, inst := range insts { + //if !inst.IgnoreMonitorAgent() { + // instCount[inst.GetManageHost()]-- + //} + instCount[inst.GetManageHost()]-- + } + } + + if monitoredOptions == nil || !monitoredOptions.TSMonitorEnabled { + return nil + } + + hosts := make([]string, 0) + for host, count := range instCount { + if count != 0 { + continue + } + hosts = append(hosts, host) + } + + if err := StopMonitored(ctx, hosts, noAgentHosts, options.OptTimeout); err != nil && !options.Force { + return err + } + + return nil +} + +// Restart the cluster. +func Restart( + ctx context.Context, + cluster spec.Topology, + options Options, + tlsCfg *tls.Config, +) error { + err := Stop(ctx, cluster, options) + if err != nil { + return errors.WithMessagef(err, "failed to stop") + } + + err = Start(ctx, cluster, options, tlsCfg) + if err != nil { + return errors.WithMessagef(err, "failed to start") + } + + return nil +} + +// StartMonitored start BlackboxExporter and NodeExporter +func StartMonitored(ctx context.Context, hosts []string, noAgentHosts set.StringSet, timeout uint64) error { + return systemctlMonitor(ctx, hosts, noAgentHosts, "start", timeout) +} + +// StopMonitored stop BlackboxExporter and NodeExporter +func StopMonitored(ctx context.Context, hosts []string, noAgentHosts set.StringSet, timeout uint64) error { + return systemctlMonitor(ctx, hosts, noAgentHosts, "stop", timeout) +} + +// RestartMonitored stop BlackboxExporter and NodeExporter +func RestartMonitored(ctx context.Context, hosts []string, noAgentHosts set.StringSet, timeout uint64) error { + err := StopMonitored(ctx, hosts, noAgentHosts, timeout) + if err != nil { + return err + } + + return StartMonitored(ctx, hosts, noAgentHosts, timeout) +} + +// EnableMonitored enable/disable monitor service in a cluster +func EnableMonitored(ctx context.Context, hosts []string, noAgentHosts set.StringSet, timeout uint64, isEnable bool) error { + action := "disable" + if isEnable { + action = "enable" + } + + return systemctlMonitor(ctx, hosts, noAgentHosts, action, timeout) +} + +func systemctlMonitor(ctx context.Context, hosts []string, noAgentHosts set.StringSet, action string, timeout uint64) error { + logger := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) + for _, comp := range []string{spec.ComponentTSMonitor} { + logger.Infof("%s component %s", actionPrevMsgs[action], comp) + + errg, _ := errgroup.WithContext(ctx) + for _, host := range hosts { + host := host + if noAgentHosts.Exist(host) { + logger.Debugf("Ignored %s component %s for %s", action, comp, host) + continue + } + errg.Go(func() error { + logger.Infof("\t%s instance %s", actionPrevMsgs[action], host) + e := ctxt.GetInner(ctx).Get(host) + service := fmt.Sprintf("%s.service", comp) + + if err := systemctl(ctx, e, service, action, timeout); err != nil { + return toFailedActionError(err, action, host, service, "") + } + + logger.Infof("\t%s %s success", actionPostMsgs[action], host) + return nil + }) + } + if err := errg.Wait(); err != nil { + return err + } + } + + return nil +} + +//lint:ignore U1000 keep this +func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64, tlsCfg *tls.Config) error { + e := ctxt.GetInner(ctx).Get(ins.GetManageHost()) + logger := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) + logger.Infof("\tRestarting instance %s", ins.ID()) + + if err := systemctl(ctx, e, ins.ServiceName(), "restart", timeout); err != nil { + return toFailedActionError(err, "restart", ins.GetManageHost(), ins.ServiceName(), ins.LogDir()) } + + // Check ready. + if err := ins.Ready(ctx, e, timeout, tlsCfg); err != nil { + return toFailedActionError(err, "restart", ins.GetManageHost(), ins.ServiceName(), ins.LogDir()) + } + + logger.Infof("\tRestart instance %s success", ins.ID()) + return nil } diff --git a/pkg/cluster/operation/uninstall.go b/pkg/cluster/operation/uninstall.go index 7cf43cd..fd2a6f4 100644 --- a/pkg/cluster/operation/uninstall.go +++ b/pkg/cluster/operation/uninstall.go @@ -48,7 +48,18 @@ func Uninstall( insts := com.Instances() err := DestroyComponent(ctx, insts, cluster, options) if err != nil && !options.Force { - return errors.WithMessagef(err, "failed to destroy %s", com.Name()) + return errors.WithMessagef(err, "failed to uninstall %s", com.Name()) + } + + for _, inst := range insts { + instCount[inst.GetManageHost()]-- + if instCount[inst.GetManageHost()] == 0 { + if cluster.GetMonitoredOptions() != nil || !cluster.GetMonitoredOptions().TSMonitorEnabled { + if err = DestroyMonitored(ctx, inst, cluster.GetMonitoredOptions(), options.OptTimeout); err != nil && !options.Force { + return errors.WithStack(err) + } + } + } } } @@ -150,6 +161,48 @@ func DeletePublicKey(ctx context.Context, host string) error { return nil } +// DestroyMonitored destroy the monitored service. +func DestroyMonitored(ctx context.Context, inst spec.Instance, options *spec.TSMonitoredOptions, timeout uint64) error { + e := ctxt.GetInner(ctx).Get(inst.GetManageHost()) + logger := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) + + logger.Infof("Uninstalling monitored %s", inst.GetManageHost()) + logger.Infof("\tUninstalling instance %s", inst.GetManageHost()) + + // Stop by systemd. + delPaths := make([]string, 0) + + delPaths = append(delPaths, options.LogDir) + + delPaths = append(delPaths, options.DeployDir) + + delPaths = append(delPaths, fmt.Sprintf("/etc/systemd/system/%s.service", spec.ComponentTSMonitor)) + + c := module.ShellModuleConfig{ + Command: fmt.Sprintf("rm -rf %s;", strings.Join(delPaths, " ")), + Sudo: true, // the .service files are in a directory owned by root + Chdir: "", + UseShell: false, + } + shell := module.NewShellModule(c) + stdout, stderr, err := shell.Execute(ctx, e) + + if len(stdout) > 0 { + fmt.Println(string(stdout)) + } + if len(stderr) > 0 { + logger.Errorf(string(stderr)) + } + + if err != nil { + return errors.WithMessagef(err, "failed to uninstalling monitored: %s", inst.GetManageHost()) + } + + logger.Infof("Uninstalling monitored on %s success", inst.GetManageHost()) + + return nil +} + // CleanupComponent cleanup the instances func CleanupComponent(ctx context.Context, delFileMaps map[string]set.StringSet) error { logger := ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger) diff --git a/pkg/cluster/spec/grafana.go b/pkg/cluster/spec/grafana.go index 984df1a..c84ab63 100644 --- a/pkg/cluster/spec/grafana.go +++ b/pkg/cluster/spec/grafana.go @@ -19,6 +19,7 @@ import ( "fmt" "os" "path/filepath" + "reflect" "strings" "time" @@ -198,7 +199,7 @@ func (i *GrafanaInstance) InitConfig( } err := mergeAdditionalGrafanaConf(fp, userConfig) if err != nil { - return err + return errors.WithStack(err) } dst = filepath.Join(paths.Deploy, "conf", "grafana.ini") @@ -221,28 +222,25 @@ func (i *GrafanaInstance) InitConfig( return errors.WithStack(err) } - //topo := reflect.ValueOf(i.topo) - //if topo.Kind() == reflect.Ptr { - // topo = topo.Elem() - //} - // TODO: maybe should deploy a ts-server instance - //val := topo.FieldByName("Monitors") - //if (val == reflect.Value{}) { - // return errors.Errorf("field Monitors not found in topology: %v", topo) - //} - //monitors := val.Interface().([]*PrometheusSpec) - //// transfer datasource.yml - //if len(monitors) == 0 { - // return errors.New("no prometheus found in topology") - //} + topo := reflect.ValueOf(i.topo) + if topo.Kind() == reflect.Ptr { + topo = topo.Elem() + } + val := topo.FieldByName("Monitors") + if (val == reflect.Value{}) { + return errors.Errorf("field Monitors not found in topology: %v", topo) + } + monitors := val.Interface().([]*TSServerSpec) + // transfer datasource.yml + if len(monitors) == 0 { + return errors.Errorf("no monitoring_servers found in topology") + } fp = filepath.Join(paths.Cache, fmt.Sprintf("datasource_%s.yml", i.GetHost())) datasourceCfg := &config.DatasourceConfig{ ClusterName: clusterName, } - if len(i.topo.(*Specification).TSSqlServers) > 0 { - sql0 := i.topo.(*Specification).TSSqlServers[0] - datasourceCfg.URL = fmt.Sprintf("http://%s", utils.JoinHostPort(sql0.Host, sql0.Port)) // TODO: monitor server address - } + monitor := monitors[0] + datasourceCfg.URL = fmt.Sprintf("http://%s", utils.JoinHostPort(monitor.Host, monitor.Port)) // TODO: https supported if err = datasourceCfg.ConfigToFile(fp); err != nil { return errors.WithStack(err) diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 5da830e..0ccc1bd 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -38,6 +38,7 @@ const ( ComponentTSMeta = "ts-meta" ComponentTSSql = "ts-sql" ComponentTSStore = "ts-store" + ComponentTSServer = "ts-server" ComponentTSMonitor = "ts-monitor" ComponentGrafana = "grafana" ) @@ -73,7 +74,7 @@ type Instance interface { DeployDir() string UsedPorts() []int UsedDirs() []string - Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string + Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, tsMetaList ...string) string Uptime(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration DataDir() string LogDir() string @@ -426,8 +427,8 @@ func (i *BaseInstance) UsedDirs() []string { } // Status implements Instance interface -func (i *BaseInstance) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { - return i.StatusFn(ctx, timeout, tlsCfg, pdList...) +func (i *BaseInstance) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, tsMetaList ...string) string { + return i.StatusFn(ctx, timeout, tlsCfg, tsMetaList...) } // Uptime implements Instance interface diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go new file mode 100644 index 0000000..899ad50 --- /dev/null +++ b/pkg/cluster/spec/monitoring.go @@ -0,0 +1,225 @@ +// Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "context" + "crypto/tls" + "fmt" + "path/filepath" + "time" + + "github.com/openGemini/gemix/pkg/cluster/ctxt" + "github.com/openGemini/gemix/pkg/cluster/template/scripts" + "github.com/openGemini/gemix/pkg/meta" + "github.com/openGemini/gemix/pkg/utils" + "github.com/pkg/errors" +) + +// TSServerSpec represents the ts-server topology specification in topology.yaml +type TSServerSpec struct { + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` + + Source string `yaml:"source,omitempty" validate:"source:editable"` + + // Use Name to get the name with a default value if it's empty. + Name string `yaml:"name"` + + Host string `yaml:"host"` + ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` + ListenHost string `yaml:"listen_host,omitempty"` + SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` + + LogDir string `yaml:"log_dir,omitempty"` + DeployDir string `yaml:"deploy_dir,omitempty"` + DataDir string `yaml:"data_dir,omitempty"` + + // port specification + Port int `yaml:"port" default:"8186"` + RaftPort int `yaml:"raft_port" default:"8188"` + ClientPort int `yaml:"client_port" default:"8191"` + PeerPort int `yaml:"peer_port" default:"8192"` + IngestPort int `yaml:"ingest_port" default:"8410"` + SelectPort int `yaml:"select_port" default:"8411"` + + Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` +} + +func (s *TSServerSpec) SSH() (string, int) { + host := s.Host + if s.ManageHost != "" { + host = s.ManageHost + } + return host, s.SSHPort +} + +// Role returns the component role of the instance +func (s *TSServerSpec) Role() string { + return ComponentTSServer +} + +// GetManageHost returns the manage host of the instance +func (s *TSServerSpec) GetManageHost() string { + if s.ManageHost != "" { + return s.ManageHost + } + return s.Host +} + +// GetSource returns source to download the component +func (s *TSServerSpec) GetSource() string { + if s.Source == "" { + return ComponentOpenGemini + } + return s.Source +} + +// TSServerComponent represents ts-meta component. +type TSServerComponent struct{ Topology } + +// Name implements Component interface. +func (c *TSServerComponent) Name() string { + return ComponentTSServer +} + +// Role implements Component interface. +func (c *TSServerComponent) Role() string { + return ComponentTSServer +} + +// Instances implements Component interface. +func (c *TSServerComponent) Instances() []Instance { + servers := c.BaseTopo().Monitors + ins := make([]Instance, 0, len(servers)) + + for _, s := range servers { + s := s + ins = append(ins, &TSServerInstance{ + BaseInstance: BaseInstance{ + InstanceSpec: s, + Name: c.Name(), + Host: s.Host, + ManageHost: s.ManageHost, + ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost, + Port: s.Port, + SSHP: s.SSHPort, + Source: s.GetSource(), + + Ports: []int{ + s.Port, + s.RaftPort, + s.ClientPort, + s.PeerPort, + s.IngestPort, + s.SelectPort, + }, + Dirs: []string{ + s.DeployDir, + s.LogDir, + s.DataDir, + }, + StatusFn: func(_ context.Context, timeout time.Duration, _ *tls.Config, _ ...string) string { + return statusByHost(s.GetManageHost(), s.Port, "/status", timeout, nil) + }, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) + }, + }, + topo: c.Topology, + }) + } + return ins +} + +// TSServerInstance represent the ts-meta instance +type TSServerInstance struct { + BaseInstance + topo Topology +} + +func (i *TSServerInstance) InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error { + //topo := i.topo.(*Specification) + gOpts := *i.topo.BaseTopo().GlobalOptions + if err := i.BaseInstance.InitConfig(ctx, e, gOpts, deployUser, paths); err != nil { + return err + } + + //enableTLS := topo.GlobalOptions.TLSEnabled + spec := i.InstanceSpec.(*TSServerSpec) + + cfg := &scripts.TSServerScript{ + DeployDir: paths.Deploy, + LogDir: paths.Log, + } + + fp := filepath.Join(paths.Cache, fmt.Sprintf("run_ts_server_%s_%d.sh", i.GetHost(), i.GetPort())) + if err := cfg.ConfigToFile(fp); err != nil { + return errors.WithStack(err) + } + dst := filepath.Join(paths.Deploy, "scripts", "run_ts-server.sh") + + if err := e.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return errors.WithStack(err) + } + + _, _, err := e.Execute(ctx, "chmod +x "+dst, false) + if err != nil { + return errors.WithStack(err) + } + + //globalConfig := topo.ServerConfigs.TsStore + + // set TLS configs + //spec.Config, err = i.setTLSConfig(ctx, enableTLS, spec.Config, paths) + //if err != nil { + // return err + //} + + configs := i.SetDefaultConfig(spec.Config) + + if err = i.MergeServerConfig(ctx, e, nil, configs, paths); err != nil { + return errors.WithStack(err) + } + + return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".toml", paths, nil) +} + +func (i *TSServerInstance) SetDefaultConfig(instanceConf map[string]any) map[string]any { + if instanceConf == nil { + instanceConf = make(map[string]any, 20) + } + var serverSpec = i.InstanceSpec.(*TSServerSpec) + + instanceConf["common.meta-join"] = []string{utils.JoinHostPort(serverSpec.Host, serverSpec.PeerPort)} + instanceConf["meta.bind-address"] = utils.JoinHostPort(i.Host, serverSpec.RaftPort) + instanceConf["meta.http-bind-address"] = utils.JoinHostPort(i.Host, serverSpec.ClientPort) + instanceConf["meta.rpc-bind-address"] = utils.JoinHostPort(i.Host, serverSpec.PeerPort) + instanceConf["meta.dir"] = filepath.Join(serverSpec.DataDir, "meta") + instanceConf["data.store-ingest-addr"] = utils.JoinHostPort(i.Host, serverSpec.IngestPort) + instanceConf["data.store-select-addr"] = utils.JoinHostPort(i.Host, serverSpec.SelectPort) + instanceConf["data.store-data-dir"] = serverSpec.DataDir + instanceConf["data.store-wal-dir"] = serverSpec.DataDir + instanceConf["http.bind-address"] = utils.JoinHostPort(i.Host, serverSpec.Port) + instanceConf["logging.path"] = serverSpec.LogDir + + // monitor + if i.topo.(*Specification).MonitoredOptions.TSMonitorEnabled { + instanceConf["monitor.pushers"] = "file" + instanceConf["monitor.store-enabled"] = false //TODO + instanceConf["monitor.store-path"] = filepath.Join(serverSpec.LogDir, "metric", "server-metric.data") + } + return instanceConf +} diff --git a/pkg/cluster/spec/parse_topology.go b/pkg/cluster/spec/parse_topology.go index 39af0fa..e1f5c7a 100644 --- a/pkg/cluster/spec/parse_topology.go +++ b/pkg/cluster/spec/parse_topology.go @@ -16,7 +16,9 @@ package spec import ( "os" + "path" "path/filepath" + "reflect" "strings" "github.com/joomcode/errorx" @@ -65,11 +67,11 @@ func ReadFromYaml(file string) (*Specification, error) { // TODO: check required options //if pass := checkRequiredOptions(yamlSpec); !pass { - // return nil, errors.New("missing requitred options for yaml configuration file") + // return nil, errors.New("missing required options for yaml configuration file") //} // TODO: Update with default values - //updataWithGlobalDefaults(&yamlSpec) + //updateWithGlobalDefaults(&yamlSpec) return yamlSpec, nil } @@ -133,55 +135,59 @@ func ExpandRelativeDir(topo Topology) { expandRelativePath(deployUser(topo), topo) } -func expandRelativePath(user string, topology Topology) { - topo := topology.(*Specification) - topo.GlobalOptions.DeployDir = Abs(user, topo.GlobalOptions.DeployDir) - topo.GlobalOptions.LogDir = Abs(user, topo.GlobalOptions.LogDir) +func expandRelativePath(user string, topo any) { + v := reflect.Indirect(reflect.ValueOf(topo).Elem()) - // set ts-monitor default deploy directory - if strings.TrimSpace(topo.MonitoredOptions.DeployDir) == "" { - topo.MonitoredOptions.DeployDir = topo.GlobalOptions.DeployDir - } else { - topo.MonitoredOptions.DeployDir = Abs(user, topo.MonitoredOptions.DeployDir) - } - - // set ts-monitor default log directory - if strings.TrimSpace(topo.MonitoredOptions.LogDir) == "" { - topo.MonitoredOptions.LogDir = topo.GlobalOptions.LogDir - } else { - topo.MonitoredOptions.LogDir = Abs(user, topo.MonitoredOptions.LogDir) - } - - for i := range topo.TSMetaServers { - server := topo.TSMetaServers[i] - server.DeployDir = Abs(user, server.DeployDir) - server.LogDir = Abs(user, server.LogDir) - server.DataDir = Abs(user, server.DataDir) - } - - for i := range topo.TSSqlServers { - server := topo.TSSqlServers[i] - server.DeployDir = Abs(user, server.DeployDir) - server.LogDir = Abs(user, server.LogDir) - } - - for i := range topo.TSStoreServers { - server := topo.TSStoreServers[i] - server.DeployDir = Abs(user, server.DeployDir) - server.LogDir = Abs(user, server.LogDir) - server.DataDir = Abs(user, server.DataDir) - } - - for i := range topo.TSMonitorServers { - server := topo.TSMonitorServers[i] - server.DeployDir = Abs(user, server.DeployDir) - server.LogDir = Abs(user, server.LogDir) - } - - for i := range topo.Grafanas { - server := topo.Grafanas[i] - server.DeployDir = Abs(user, server.DeployDir) - server.DashboardDir = Abs(user, server.DashboardDir) + switch v.Kind() { + case reflect.Slice: + for i := 0; i < v.Len(); i++ { + ref := reflect.New(v.Index(i).Type()) + ref.Elem().Set(v.Index(i)) + expandRelativePath(user, ref.Interface()) + v.Index(i).Set(ref.Elem()) + } + case reflect.Struct: + // We should deal with DeployDir first, because DataDir and LogDir depends on it + dirs := []string{"DeployDir", "DataDir", "LogDir"} + for _, dir := range dirs { + f := v.FieldByName(dir) + if !f.IsValid() || f.String() == "" { + continue + } + switch dir { + case "DeployDir": + f.SetString(Abs(user, f.String())) + case "DataDir": + // Some components supports multiple data dirs split by comma + ds := strings.Split(f.String(), ",") + ads := []string{} + for _, d := range ds { + if strings.HasPrefix(d, "/") { + ads = append(ads, d) + } else { + ads = append(ads, path.Join(v.FieldByName("DeployDir").String(), d)) + } + } + f.SetString(strings.Join(ads, ",")) + case "LogDir": + if !strings.HasPrefix(f.String(), "/") { + f.SetString(path.Join(v.FieldByName("DeployDir").String(), f.String())) + } + } + } + // Deal with all fields (expandRelativePath will do nothing on string filed) + for i := 0; i < v.NumField(); i++ { + // We don't deal with GlobalOptions because relative path in GlobalOptions.Data has special meaning + if v.Type().Field(i).Name == "GlobalOptions" { + continue + } + ref := reflect.New(v.Field(i).Type()) + ref.Elem().Set(v.Field(i)) + expandRelativePath(user, ref.Interface()) + v.Field(i).Set(ref.Elem()) + } + case reflect.Ptr: + expandRelativePath(user, v.Interface()) } } diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index 3f4e6fe..c50202d 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -37,11 +37,16 @@ const ( FullOSType FullHostType = "OS" ) +var ( + RoleMonitor = "monitor" +) + type ( // InstanceSpec represent a instance specification InstanceSpec interface { Role() string SSH() (string, int) + //IgnoreMonitorAgent() bool } // GlobalOptions represents the global options for all groups in topology @@ -51,6 +56,7 @@ type ( Group string `yaml:"group,omitempty"` SSHPort int `yaml:"ssh_port,omitempty" default:"22" validate:"ssh_port:editable"` TLSEnabled bool `yaml:"enable_tls,omitempty"` + ListenHost string `yaml:"listen_host,omitempty" validate:"listen_host:editable"` DeployDir string `yaml:"deploy_dir,omitempty" default:"deploy"` LogDir string `yaml:"log_dir,omitempty" default:"logs"` DataDir string `yaml:"data_dir,omitempty" default:"data"` @@ -84,10 +90,9 @@ type ( TSMetaServers []*TSMetaSpec `yaml:"ts_meta_servers"` TSSqlServers []*TSSqlSpec `yaml:"ts_sql_servers"` TSStoreServers []*TSStoreSpec `yaml:"ts_store_servers"` - TSMonitorServers []*TSMonitorSpec `yaml:"ts_monitor_servers,omitempty"` + Monitors []*TSServerSpec `yaml:"monitoring_servers"` Grafanas []*GrafanaSpec `yaml:"grafana_servers,omitempty"` //DashboardServers []*DashboardSpec `yaml:"opengemini_dashboard_servers,omitempty"` - //Monitors []*PrometheusSpec `yaml:"monitoring_servers"` } ) @@ -102,6 +107,7 @@ type Topology interface { ComponentsByStopOrder() []Component //ComponentsByUpdateOrder(curVer string) []Component IterInstance(fn func(instance Instance), concurrency ...int) + GetMonitoredOptions() *TSMonitoredOptions CountDir(host string, dir string) int // count how many time a path is used by instances in cluster //TLSConfig(dir string) (*tls.Config, error) //Merge(that Topology) Topology // TODO: for update @@ -110,10 +116,13 @@ type Topology interface { } type BaseTopo struct { - GlobalOptions *GlobalOptions - MasterList []string + GlobalOptions *GlobalOptions + MonitoredOptions *TSMonitoredOptions + MasterList []string - Grafanas []*GrafanaSpec + GrafanaVersion *string + Monitors []*TSServerSpec + Grafanas []*GrafanaSpec } // BaseMeta is the base info of metadata. @@ -140,6 +149,11 @@ func AllComponentNames() (roles []string) { return } +// GetMonitoredOptions implements Topology interface. +func (s *Specification) GetMonitoredOptions() *TSMonitoredOptions { + return &s.MonitoredOptions +} + // UnmarshalYAML implements the yaml.Unmarshaler interface, // it sets the default values when unmarshaling the topology file func (s *Specification) UnmarshalYAML(unmarshal func(any) error) error { @@ -153,14 +167,24 @@ func (s *Specification) UnmarshalYAML(unmarshal func(any) error) error { return errors.WithStack(err) } + // Set monitored options + if s.MonitoredOptions.DeployDir == "" { + s.MonitoredOptions.DeployDir = filepath.Join(s.GlobalOptions.DeployDir, RoleMonitor) + } + if s.MonitoredOptions.LogDir == "" { + s.MonitoredOptions.LogDir = "logs" + } + if !strings.HasPrefix(s.MonitoredOptions.LogDir, "/") && + !strings.HasPrefix(s.MonitoredOptions.LogDir, s.MonitoredOptions.DeployDir) { + s.MonitoredOptions.LogDir = filepath.Join(s.MonitoredOptions.DeployDir, s.MonitoredOptions.LogDir) + } + // populate custom default values as needed if err := fillCustomDefaults(&s.GlobalOptions, s); err != nil { return err } - // TODO: validate yaml configs - //return s.Validate() - return nil + return s.Validate() } // fillDefaults tries to fill custom fields to their default values @@ -335,11 +359,48 @@ func setCustomDefaults(globalOptions *GlobalOptions, field reflect.Value) error return nil } +func findField(v reflect.Value, fieldName string) (int, bool) { + for i := 0; i < reflect.Indirect(v).NumField(); i++ { + if reflect.Indirect(v).Type().Field(i).Name == fieldName { + return i, true + } + } + return -1, false +} + +//lint:ignore U1000 keep this +func findSliceField(v Topology, fieldName string) (reflect.Value, bool) { + topo := reflect.ValueOf(v) + if topo.Kind() == reflect.Ptr { + topo = topo.Elem() + } + + j, found := findField(topo, fieldName) + if found { + val := topo.Field(j) + if val.Kind() == reflect.Slice || val.Kind() == reflect.Array { + return val, true + } + } + return reflect.Value{}, false +} + +// GetTsMetaList returns a list of ts-meta API hosts of the current cluster +func (s *Specification) GetTsMetaList() []string { + var tsMetaList []string + + for _, m := range s.TSMetaServers { + tsMetaList = append(tsMetaList, utils.JoinHostPort(m.Host, m.ClientPort)) + } + + return tsMetaList +} + // GetTSMetaListWithManageHost returns a list of ts-meta API hosts of the current cluster func (s *Specification) GetTSMetaListWithManageHost() []string { var tsMetaList []string - for _, meta := range s.TSMetaServers { - tsMetaList = append(tsMetaList, utils.JoinHostPort(meta.GetManageHost(), meta.ClientPort)) + for _, m := range s.TSMetaServers { + tsMetaList = append(tsMetaList, utils.JoinHostPort(m.GetManageHost(), m.ClientPort)) } return tsMetaList @@ -348,9 +409,11 @@ func (s *Specification) GetTSMetaListWithManageHost() []string { // BaseTopo implements Specification interface. func (s *Specification) BaseTopo() *BaseTopo { return &BaseTopo{ - GlobalOptions: &s.GlobalOptions, - MasterList: s.GetTSMetaListWithManageHost(), - Grafanas: s.Grafanas, + GlobalOptions: &s.GlobalOptions, + MonitoredOptions: s.GetMonitoredOptions(), + MasterList: s.GetTSMetaListWithManageHost(), + Monitors: s.Monitors, + Grafanas: s.Grafanas, } } @@ -370,12 +433,12 @@ func (s *Specification) ComponentsByStopOrder() (comps []Component) { // ComponentsByStartOrder return component in the order need to start. func (s *Specification) ComponentsByStartOrder() (comps []Component) { - // "ts-meta", "ts-store", "ts-sql", "ts-data", "ts-monitor", "grafana" + // "ts-meta", "ts-store", "ts-sql", "ts-data", "monitor(ts-server)", "grafana" comps = append(comps, &TSMetaComponent{Topology: s}) comps = append(comps, &TSStoreComponent{Topology: s}) comps = append(comps, &TSSqlComponent{Topology: s}) //comps = append(comps, &TSDataComponent{s}) - comps = append(comps, &TSMonitorComponent{Topology: s}) + comps = append(comps, &TSServerComponent{Topology: s}) comps = append(comps, &GrafanaComponent{Topology: s}) return } @@ -412,6 +475,21 @@ func (s *Specification) IterInstance(fn func(instance Instance), concurrency ... wg.Wait() } +// IterHost iterates one instance for each host +func IterHost(topo Topology, fn func(instance Instance)) { + hostMap := make(map[string]bool) + for _, comp := range topo.ComponentsByStartOrder() { + for _, inst := range comp.Instances() { + host := inst.GetHost() + _, ok := hostMap[host] + if !ok { + hostMap[host] = true + fn(inst) + } + } + } +} + // FillHostArchOrOS fills the topology with the given host->arch func (s *Specification) FillHostArchOrOS(hostArchOrOS map[string]string, fullType FullHostType) error { if err := FillHostArchOrOS(s, hostArchOrOS, fullType); err != nil { @@ -434,60 +512,110 @@ func FillHostArchOrOS(s *Specification, hostArchOrOS map[string]string, fullType } } - if fullType == FullOSType { - for _, server := range s.TSMetaServers { - if server.OS == "" { - server.OS = hostArchOrOS[server.Host] - } - } - for _, server := range s.TSSqlServers { - if server.OS == "" { - server.OS = hostArchOrOS[server.Host] - } - } - for _, server := range s.TSStoreServers { - if server.OS == "" { - server.OS = hostArchOrOS[server.Host] - } - } - for _, server := range s.TSMonitorServers { - if server.OS == "" { - server.OS = hostArchOrOS[server.Host] - } + v := reflect.ValueOf(s).Elem() + t := v.Type() + + for i := 0; i < t.NumField(); i++ { + field := v.Field(i) + if field.Kind() != reflect.Slice { + continue } - for _, server := range s.Grafanas { - if server.OS == "" { - server.OS = hostArchOrOS[server.Host] + for j := 0; j < field.Len(); j++ { + if err := setHostArchOrOS(field.Index(j), hostArchOrOS, fullType); err != nil { + return err } } } - if fullType == FullArchType { - for _, server := range s.TSMetaServers { - if server.Arch == "" { - server.Arch = hostArchOrOS[server.Host] - } - } - for _, server := range s.TSSqlServers { - if server.Arch == "" { - server.Arch = hostArchOrOS[server.Host] - } - } - for _, server := range s.TSStoreServers { - if server.Arch == "" { - server.Arch = hostArchOrOS[server.Host] - } - } - for _, server := range s.TSMonitorServers { - if server.Arch == "" { - server.Arch = hostArchOrOS[server.Host] - } + + //if fullType == FullOSType { + // for _, server := range s.TSMetaServers { + // if server.OS == "" { + // server.OS = hostArchOrOS[server.Host] + // } + // } + // for _, server := range s.TSSqlServers { + // if server.OS == "" { + // server.OS = hostArchOrOS[server.Host] + // } + // } + // for _, server := range s.TSStoreServers { + // if server.OS == "" { + // server.OS = hostArchOrOS[server.Host] + // } + // } + // for _, server := range s.Monitors { + // if server.OS == "" { + // server.OS = hostArchOrOS[server.Host] + // } + // } + // for _, server := range s.Grafanas { + // if server.OS == "" { + // server.OS = hostArchOrOS[server.Host] + // } + // } + //} + //if fullType == FullArchType { + // for _, server := range s.TSMetaServers { + // if server.Arch == "" { + // server.Arch = hostArchOrOS[server.Host] + // } + // } + // for _, server := range s.TSSqlServers { + // if server.Arch == "" { + // server.Arch = hostArchOrOS[server.Host] + // } + // } + // for _, server := range s.TSStoreServers { + // if server.Arch == "" { + // server.Arch = hostArchOrOS[server.Host] + // } + // } + // for _, server := range s.Monitors { + // if server.Arch == "" { + // server.Arch = hostArchOrOS[server.Host] + // } + // } + // for _, server := range s.Grafanas { + // if server.Arch == "" { + // server.Arch = hostArchOrOS[server.Host] + // } + // } + //} + return nil +} + +func setHostArchOrOS(field reflect.Value, hostArchOrOS map[string]string, fullType FullHostType) error { + if !field.CanSet() || isSkipField(field) { + return nil + } + + if field.Kind() == reflect.Ptr { + return setHostArchOrOS(field.Elem(), hostArchOrOS, fullType) + } + + if field.Kind() != reflect.Struct { + return nil + } + + host := field.FieldByName("Host") + if field.FieldByName("ManageHost").String() != "" { + host = field.FieldByName("ManageHost") + } + + arch := field.FieldByName("Arch") + os := field.FieldByName("OS") + + // set arch only if not set before + if fullType == FullOSType { + if !host.IsZero() && os.CanSet() && len(os.String()) == 0 { + os.Set(reflect.ValueOf(hostArchOrOS[host.String()])) } - for _, server := range s.Grafanas { - if server.Arch == "" { - server.Arch = hostArchOrOS[server.Host] - } + } else { + if !host.IsZero() && arch.CanSet() && len(arch.String()) == 0 { + arch.Set(reflect.ValueOf(hostArchOrOS[host.String()])) } } + return nil } diff --git a/pkg/cluster/spec/spec_manager.go b/pkg/cluster/spec/spec_manager.go index 7c2bd5a..a09398f 100644 --- a/pkg/cluster/spec/spec_manager.go +++ b/pkg/cluster/spec/spec_manager.go @@ -108,12 +108,12 @@ func (s *SpecManager) Metadata(clusterName string, meta any) error { yamlFile, err := os.ReadFile(fname) if err != nil { - return errors.WithStack(err) + return errors.WithMessagef(err, "cluster name: %s read metadata failed", clusterName) } err = yaml.Unmarshal(yamlFile, meta) if err != nil { - return errors.WithStack(err) + return errors.WithMessagef(err, "ccluster name: %s unmarshal metadata failed", clusterName) } return nil @@ -139,6 +139,45 @@ func (s *SpecManager) Remove(clusterName string) error { return os.RemoveAll(s.Path(clusterName)) } +// List return the cluster names. +func (s *SpecManager) List() (clusterNames []string, err error) { + fileInfos, err := os.ReadDir(s.base) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, errors.WithStack(err) + } + + for _, info := range fileInfos { + if utils.IsNotExist(s.Path(info.Name(), metaFileName)) { + continue + } + clusterNames = append(clusterNames, info.Name()) + } + + return +} + +// GetAllClusters get a metadata list of all clusters deployed by current user +func (s *SpecManager) GetAllClusters() (map[string]Metadata, error) { + clusters := make(map[string]Metadata) + names, err := s.List() + if err != nil { + return nil, errors.WithStack(err) + } + for _, name := range names { + metadata := s.NewMetadata() + err = s.Metadata(name, metadata) + // clusters with topology validation errors should also be listed + if err != nil { + return nil, errors.WithStack(err) + } + clusters[name] = metadata + } + return clusters, nil +} + // ensureDir ensures that the cluster directory exists. func (s *SpecManager) ensureDir(clusterName string) error { if err := os.MkdirAll(s.Path(clusterName), 0750); err != nil { diff --git a/pkg/cluster/spec/spec_test.go b/pkg/cluster/spec/spec_test.go new file mode 100644 index 0000000..7bbb355 --- /dev/null +++ b/pkg/cluster/spec/spec_test.go @@ -0,0 +1,58 @@ +// Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spec + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" +) + +func TestGlobalOptions(t *testing.T) { + topo := Specification{} + err := yaml.Unmarshal([]byte(` +global: + user: "test1" + ssh_port: 220 + deploy_dir: "test-deploy" + data_dir: "test-data" +ts_meta_servers: + - host: 172.16.5.138 + deploy_dir: "gemini-deploy" +ts_store_servers: + - host: 172.16.5.53 + data_dir: "ts-store-data" +ts_sql_servers: + - host: 172.16.5.233 + data_dir: "ts-sql-data" +`), &topo) + assert.NoError(t, err) + assert.Equal(t, "test1", topo.GlobalOptions.User) + assert.Equal(t, 220, topo.GlobalOptions.SSHPort) + + assert.Equal(t, 220, topo.TSMetaServers[0].SSHPort) + assert.Equal(t, "gemini-deploy", topo.TSMetaServers[0].DeployDir) + assert.Equal(t, "test-data", topo.TSMetaServers[0].DataDir) + + assert.Equal(t, 220, topo.TSStoreServers[0].SSHPort) + assert.Equal(t, filepath.Join("test-deploy", "ts-store-8401"), topo.TSStoreServers[0].DeployDir) + assert.Equal(t, "ts-store-data", topo.TSStoreServers[0].DataDir) + + assert.Equal(t, 220, topo.TSSqlServers[0].SSHPort) + assert.Equal(t, filepath.Join("test-deploy", "ts-sql-8086"), topo.TSSqlServers[0].DeployDir) + assert.Equal(t, filepath.Join("logs"), topo.TSSqlServers[0].LogDir) +} diff --git a/pkg/cluster/spec/ts_meta.go b/pkg/cluster/spec/ts_meta.go index df0976e..5800087 100644 --- a/pkg/cluster/spec/ts_meta.go +++ b/pkg/cluster/spec/ts_meta.go @@ -16,8 +16,10 @@ package spec import ( "context" + "crypto/tls" "fmt" "path/filepath" + "time" "github.com/openGemini/gemix/pkg/cluster/ctxt" "github.com/openGemini/gemix/pkg/cluster/template/scripts" @@ -54,6 +56,33 @@ type TSMetaSpec struct { Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"` } +// Status queries current status of the instance +func (s *TSMetaSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { + //if timeout < time.Second { + // timeout = statusQueryTimeout + //} + + //addr := utils.JoinHostPort(s.GetManageHost(), s.ClientPort) + //pc := api.NewPDClient(ctx, []string{addr}, timeout, tlsCfg) + // + //// check health + //err := pc.CheckHealth() + //if err != nil { + // return "Down" + //} + // + //// find leader node + //leader, err := pc.GetLeader() + //if err != nil { + // return "ERR" + //} + res := "Up" + //if s.Name == leader.Name { + // res += "|L" + //} + return res +} + func (s *TSMetaSpec) SSH() (string, int) { host := s.Host if s.ManageHost != "" { @@ -108,7 +137,7 @@ func (c *TSMetaComponent) Instances() []Instance { Name: c.Name(), Host: s.Host, ManageHost: s.ManageHost, - ListenHost: s.ListenHost, + ListenHost: utils.Ternary(s.ListenHost != "", s.ListenHost, c.Topology.BaseTopo().GlobalOptions.ListenHost).(string), Port: s.ClientPort, SSHP: s.SSHPort, Source: s.GetSource(), @@ -124,10 +153,10 @@ func (c *TSMetaComponent) Instances() []Instance { s.LogDir, s.DataDir, }, - //StatusFn: s.Status, - //UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { - // return UptimeByHost(s.GetManageHost(), s.ClientPort, timeout, tlsCfg) - //}, + StatusFn: s.Status, + UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { + return UptimeByHost(s.GetManageHost(), s.ClientPort, timeout, tlsCfg) + }, }, topo: c.Topology, }) diff --git a/pkg/cluster/spec/ts_monitor.go b/pkg/cluster/spec/ts_monitor.go index acde0c4..0e96434 100644 --- a/pkg/cluster/spec/ts_monitor.go +++ b/pkg/cluster/spec/ts_monitor.go @@ -28,6 +28,18 @@ import ( "github.com/pkg/errors" ) +type MonitorHostInfo struct { + Ssh int // ssh port of host + Os string // operating system + Arch string // cpu architecture + MetricPath string + ErrorLogPath string + DataPath string + WALPath string + ProcessName set.StringSet + MonitorAddr string +} + // TSMonitorSpec represents the ts_monitor_enabled topology specification in topology.yaml type TSMonitorSpec struct { Arch string `yaml:"arch,omitempty"` @@ -226,31 +238,6 @@ func (c *TSMonitorComponent) Instances() []Instance { }) } - for _, s := range c.Topology.TSMonitorServers { - if instanceSet.Exist(s.Host) { - continue - } - s := s - instanceSet.Insert(s.Host) - ins = append(ins, &TSMonitorInstance{ - Name: c.Name(), - BaseInstance: BaseInstance{ - InstanceSpec: s, - Name: c.Name(), - Host: s.Host, - ManageHost: s.ManageHost, - ListenHost: s.ListenHost, - SSHP: s.SSHPort, - Source: s.GetSource(), - Dirs: []string{ - s.DeployDir, - s.LogDir, - }, - }, - topo: c.Topology, - }) - } - for _, mo := range ins { var procs []string for proc := range mo.(*TSMonitorInstance).BaseInstance.InstanceSpec.(*TSMonitorSpec).MonitorProcess { diff --git a/pkg/cluster/spec/ts_sql.go b/pkg/cluster/spec/ts_sql.go index 83abd12..7fd5a18 100644 --- a/pkg/cluster/spec/ts_sql.go +++ b/pkg/cluster/spec/ts_sql.go @@ -34,7 +34,8 @@ type TSSqlSpec struct { Source string `yaml:"source,omitempty" validate:"source:editable"` // Use Name to get the name with a default value if it's empty. - Name string `yaml:"name"` + Name string `yaml:"name"` + IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` @@ -64,6 +65,11 @@ func (s *TSSqlSpec) Role() string { return ComponentTSSql } +// IgnoreMonitorAgent returns if the node does not have monitor agents available +func (s *TSSqlSpec) IgnoreMonitorAgent() bool { + return s.IgnoreExporter +} + // GetManageHost returns the manage host of the instance func (s *TSSqlSpec) GetManageHost() string { if s.ManageHost != "" { @@ -105,7 +111,7 @@ func (c *TSSqlComponent) Instances() []Instance { Name: c.Name(), Host: s.Host, ManageHost: s.ManageHost, - ListenHost: s.ListenHost, + ListenHost: utils.Ternary(s.ListenHost != "", s.ListenHost, c.Topology.BaseTopo().GlobalOptions.ListenHost).(string), Port: s.Port, SSHP: s.SSHPort, Source: s.GetSource(), diff --git a/pkg/cluster/spec/ts_store.go b/pkg/cluster/spec/ts_store.go index 3ad3afe..fc24c38 100644 --- a/pkg/cluster/spec/ts_store.go +++ b/pkg/cluster/spec/ts_store.go @@ -107,7 +107,7 @@ func (c *TSStoreComponent) Instances() []Instance { Name: c.Name(), Host: s.Host, ManageHost: s.ManageHost, - ListenHost: s.ListenHost, + ListenHost: utils.Ternary(s.ListenHost != "", s.ListenHost, c.Topology.BaseTopo().GlobalOptions.ListenHost).(string), Port: s.SelectPort, // do not change me SSHP: s.SSHPort, Source: s.GetSource(), diff --git a/pkg/cluster/spec/validate.go b/pkg/cluster/spec/validate.go index 03cc578..303bb36 100644 --- a/pkg/cluster/spec/validate.go +++ b/pkg/cluster/spec/validate.go @@ -12,21 +12,608 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package spec import ( + "fmt" "path/filepath" "reflect" + "regexp" + "strconv" "strings" + + "github.com/openGemini/gemix/pkg/gui" + "github.com/openGemini/gemix/pkg/meta" + "github.com/openGemini/gemix/pkg/set" + "github.com/openGemini/gemix/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// pre defined error types +var ( + errNSDeploy = errNS.NewSubNamespace("deploy") + errDeployDirConflict = errNSDeploy.NewType("dir_conflict", utils.ErrTraitPreCheck) + errDeployDirOverlap = errNSDeploy.NewType("dir_overlap", utils.ErrTraitPreCheck) + errDeployPortConflict = errNSDeploy.NewType("port_conflict", utils.ErrTraitPreCheck) + ErrUserOrGroupInvalid = errors.New(`linux username and groupname must start with a lower case letter or an underscore, ` + + `followed by lower case letters, digits, underscores, or dashes. ` + + `Usernames may only be up to 32 characters long. ` + + `Groupnames may only be up to 16 characters long.`) ) -func findField(v reflect.Value, fieldName string) (int, bool) { - for i := 0; i < reflect.Indirect(v).NumField(); i++ { - if reflect.Indirect(v).Type().Field(i).Name == fieldName { - return i, true +// Linux username and groupname must start with a lower case letter or an underscore, +// followed by lower case letters, digits, underscores, or dashes. +// ref https://man7.org/linux/man-pages/man8/useradd.8.html +// ref https://man7.org/linux/man-pages/man8/groupadd.8.html +var ( + reUser = regexp.MustCompile(`^[a-z_]([a-z0-9_-]{0,31}|[a-z0-9_-]{0,30}\$)$`) + reGroup = regexp.MustCompile(`^[a-z_]([a-z0-9_-]{0,15})$`) +) + +func fixDir(topo Topology) func(string) string { + return func(dir string) string { + if dir != "" { + return Abs(topo.BaseTopo().GlobalOptions.User, dir) } + return dir + } +} + +// DirAccessor stands for a directory accessor for an instance +type DirAccessor struct { + dirKind string + accessor func(Instance, Topology) string +} + +func dirAccessors() ([]DirAccessor, []DirAccessor) { + instanceDirAccessor := []DirAccessor{ + {dirKind: "deploy directory", accessor: func(instance Instance, topo Topology) string { return instance.DeployDir() }}, + {dirKind: "data directory", accessor: func(instance Instance, topo Topology) string { return instance.DataDir() }}, + {dirKind: "log directory", accessor: func(instance Instance, topo Topology) string { return instance.LogDir() }}, + } + hostDirAccessor := []DirAccessor{ + {dirKind: "monitor deploy directory", accessor: func(instance Instance, topo Topology) string { + m := topo.BaseTopo().MonitoredOptions + if m == nil { + return "" + } + return m.DeployDir + }}, + {dirKind: "monitor log directory", accessor: func(instance Instance, topo Topology) string { + m := topo.BaseTopo().MonitoredOptions + if m == nil { + return "" + } + return m.LogDir + }}, } - return -1, false + + return instanceDirAccessor, hostDirAccessor +} + +// DirEntry stands for a directory with attributes and instance +type DirEntry struct { + clusterName string + dirKind string + dir string + instance Instance +} + +func appendEntries(name string, topo Topology, inst Instance, dirAccessor DirAccessor, targets []DirEntry) []DirEntry { + for _, dir := range strings.Split(fixDir(topo)(dirAccessor.accessor(inst, topo)), ",") { + targets = append(targets, DirEntry{ + clusterName: name, + dirKind: dirAccessor.dirKind, + dir: dir, + instance: inst, + }) + } + + return targets +} + +// CheckClusterDirConflict checks cluster dir conflict or overlap +func CheckClusterDirConflict(clusterList map[string]Metadata, clusterName string, topo Topology) error { + instanceDirAccessor, hostDirAccessor := dirAccessors() + var currentEntries []DirEntry + var existingEntries []DirEntry + + // rebuild existing disk status + for name, metadata := range clusterList { + if name == clusterName { + continue + } + + gotTopo := metadata.GetTopology() + + gotTopo.IterInstance(func(inst Instance) { + for _, dirAccessor := range instanceDirAccessor { + existingEntries = appendEntries(name, gotTopo, inst, dirAccessor, existingEntries) + } + }) + IterHost(gotTopo, func(inst Instance) { + for _, dirAccessor := range hostDirAccessor { + existingEntries = appendEntries(name, gotTopo, inst, dirAccessor, existingEntries) + } + }) + } + + topo.IterInstance(func(inst Instance) { + for _, dirAccessor := range instanceDirAccessor { + currentEntries = appendEntries(clusterName, topo, inst, dirAccessor, currentEntries) + } + }) + IterHost(topo, func(inst Instance) { + for _, dirAccessor := range hostDirAccessor { + currentEntries = appendEntries(clusterName, topo, inst, dirAccessor, currentEntries) + } + }) + + for _, d1 := range currentEntries { + // data_dir is relative to deploy_dir by default, so they can be with + // same (sub) paths as long as the deploy_dirs are different + if d1.dirKind == "data directory" && !strings.HasPrefix(d1.dir, "/") { + continue + } + for _, d2 := range existingEntries { + if d1.instance.GetManageHost() != d2.instance.GetManageHost() { + continue + } + + //// ignore conflict in the case when both sides are monitor and either one of them + //// is marked as ignore exporter. + //if strings.HasPrefix(d1.dirKind, "monitor") && + // strings.HasPrefix(d2.dirKind, "monitor") && + // (d1.instance.IgnoreMonitorAgent() || d2.instance.IgnoreMonitorAgent()) { + // continue + //} + + if d1.dir == d2.dir && d1.dir != "" { + properties := map[string]string{ + "ThisDirKind": d1.dirKind, + "ThisDir": d1.dir, + "ThisComponent": d1.instance.ComponentName(), + "ThisHost": d1.instance.GetManageHost(), + "ExistCluster": d2.clusterName, + "ExistDirKind": d2.dirKind, + "ExistDir": d2.dir, + "ExistComponent": d2.instance.ComponentName(), + "ExistHost": d2.instance.GetManageHost(), + } + zap.L().Info("Meet deploy directory conflict", zap.Any("info", properties)) + return errDeployDirConflict.New("Deploy directory conflicts to an existing cluster").WithProperty(gui.SuggestionFromTemplate(` +The directory you specified in the topology file is: + Directory: {{ColorKeyword}}{{.ThisDirKind}} {{.ThisDir}}{{ColorReset}} + Component: {{ColorKeyword}}{{.ThisComponent}} {{.ThisHost}}{{ColorReset}} + +It conflicts to a directory in the existing cluster: + Existing Cluster Name: {{ColorKeyword}}{{.ExistCluster}}{{ColorReset}} + Existing Directory: {{ColorKeyword}}{{.ExistDirKind}} {{.ExistDir}}{{ColorReset}} + Existing Component: {{ColorKeyword}}{{.ExistComponent}} {{.ExistHost}}{{ColorReset}} + +Please change to use another directory or another host. +`, properties)) + } + } + } + + return CheckClusterDirOverlap(currentEntries) +} + +// CheckClusterDirOverlap checks cluster dir overlaps with data or log. +// this should only be used across clusters. +// we don't allow to deploy log under data, and vise versa. +func CheckClusterDirOverlap(entries []DirEntry) error { + ignore := func(d1, d2 DirEntry) bool { + return (d1.instance.GetManageHost() != d2.instance.GetManageHost()) || + d1.dir == "" || d2.dir == "" || + strings.HasSuffix(d1.dirKind, "deploy directory") || + strings.HasSuffix(d2.dirKind, "deploy directory") + } + for i := 0; i < len(entries)-1; i++ { + d1 := entries[i] + for j := i + 1; j < len(entries); j++ { + d2 := entries[j] + if ignore(d1, d2) { + continue + } + + if utils.IsSubDir(d1.dir, d2.dir) || utils.IsSubDir(d2.dir, d1.dir) { + properties := map[string]string{ + "ThisDirKind": d1.dirKind, + "ThisDir": d1.dir, + "ThisComponent": d1.instance.ComponentName(), + "ThisHost": d1.instance.GetManageHost(), + "ThatDirKind": d2.dirKind, + "ThatDir": d2.dir, + "ThatComponent": d2.instance.ComponentName(), + "ThatHost": d2.instance.GetManageHost(), + } + zap.L().Info("Meet deploy directory overlap", zap.Any("info", properties)) + return errDeployDirOverlap.New("Deploy directory overlaps to another instance").WithProperty(gui.SuggestionFromTemplate(` +The directory you specified in the topology file is: + Directory: {{ColorKeyword}}{{.ThisDirKind}} {{.ThisDir}}{{ColorReset}} + Component: {{ColorKeyword}}{{.ThisComponent}} {{.ThisHost}}{{ColorReset}} + +It overlaps to another instance: + Other Directory: {{ColorKeyword}}{{.ThatDirKind}} {{.ThatDir}}{{ColorReset}} + Other Component: {{ColorKeyword}}{{.ThatComponent}} {{.ThatHost}}{{ColorReset}} + +Please modify the topology file and try again. +`, properties)) + } + } + } + + return nil +} + +// CheckClusterPortConflict checks cluster port conflict +func CheckClusterPortConflict(clusterList map[string]Metadata, clusterName string, topo Topology) error { + type Entry struct { + clusterName string + componentName string + port int + instance Instance + } + + currentEntries := []Entry{} + existingEntries := []Entry{} + + for name, metadata := range clusterList { + if name == clusterName { + continue + } + + metadata.GetTopology().IterInstance(func(inst Instance) { + for _, port := range inst.UsedPorts() { + existingEntries = append(existingEntries, Entry{ + clusterName: name, + componentName: inst.ComponentName(), + port: port, + instance: inst, + }) + } + }) + } + + topo.IterInstance(func(inst Instance) { + for _, port := range inst.UsedPorts() { + currentEntries = append(currentEntries, Entry{ + componentName: inst.ComponentName(), + port: port, + instance: inst, + }) + } + }) + + for _, p1 := range currentEntries { + for _, p2 := range existingEntries { + if p1.instance.GetManageHost() != p2.instance.GetManageHost() { + continue + } + + if p1.port == p2.port { + // build the conflict info + properties := map[string]string{ + "ThisPort": strconv.Itoa(p1.port), + "ThisComponent": p1.componentName, + "ThisHost": p1.instance.GetManageHost(), + "ExistCluster": p2.clusterName, + "ExistPort": strconv.Itoa(p2.port), + "ExistComponent": p2.componentName, + "ExistHost": p2.instance.GetManageHost(), + } + + // build error message + zap.L().Info("Meet deploy port conflict", zap.Any("info", properties)) + return errDeployPortConflict.New("Deploy port conflicts to an existing cluster").WithProperty(gui.SuggestionFromTemplate(` +The port you specified in the topology file is: + Port: {{ColorKeyword}}{{.ThisPort}}{{ColorReset}} + Component: {{ColorKeyword}}{{.ThisComponent}} {{.ThisHost}}{{ColorReset}} + +It conflicts to a port in the existing cluster: + Existing Cluster Name: {{ColorKeyword}}{{.ExistCluster}}{{ColorReset}} + Existing Port: {{ColorKeyword}}{{.ExistPort}}{{ColorReset}} + Existing Component: {{ColorKeyword}}{{.ExistComponent}} {{.ExistHost}}{{ColorReset}} + +Please change to use another port or another host. +`, properties)) + } + } + } + + return nil +} + +//lint:ignore U1000 keep this +func getHostFromAddress(addr string) string { + host, _ := utils.ParseHostPort(addr) + return host +} + +// platformConflictsDetect checks for conflicts in topology for different OS / Arch +// set to the same host / IP +func (s *Specification) platformConflictsDetect() error { + type ( + conflict struct { + os string + arch string + cfg string + } + ) + + platformStats := map[string]conflict{} + topoSpec := reflect.ValueOf(s).Elem() + topoType := reflect.TypeOf(s).Elem() + + for i := 0; i < topoSpec.NumField(); i++ { + if isSkipField(topoSpec.Field(i)) { + continue + } + + compSpecs := topoSpec.Field(i) + for index := 0; index < compSpecs.Len(); index++ { + compSpec := reflect.Indirect(compSpecs.Index(index)) + // check hostname + host := compSpec.FieldByName("Host").String() + cfg := strings.Split(topoType.Field(i).Tag.Get("yaml"), ",")[0] // without meta + if host == "" { + return errors.Errorf("`%s` contains empty host field", cfg) + } + + // platform conflicts + stat := conflict{ + cfg: cfg, + } + if j, found := findField(compSpec, "OS"); found { + stat.os = compSpec.Field(j).String() + } + if j, found := findField(compSpec, "Arch"); found { + stat.arch = compSpec.Field(j).String() + } + + prev, exist := platformStats[host] + if exist { + if prev.os != stat.os || prev.arch != stat.arch { + return &meta.ValidateErr{ + Type: meta.TypeMismatch, + Target: "platform", + LHS: fmt.Sprintf("%s:%s/%s", prev.cfg, prev.os, prev.arch), + RHS: fmt.Sprintf("%s:%s/%s", stat.cfg, stat.os, stat.arch), + Value: host, + } + } + } + platformStats[host] = stat + } + } + return nil +} + +func (s *Specification) portInvalidDetect() error { + topoSpec := reflect.ValueOf(s).Elem() + topoType := reflect.TypeOf(s).Elem() + + checkPort := func(idx int, compSpec reflect.Value) error { + compSpec = reflect.Indirect(compSpec) + cfg := strings.Split(topoType.Field(idx).Tag.Get("yaml"), ",")[0] + + for i := 0; i < compSpec.NumField(); i++ { + if strings.HasSuffix(compSpec.Type().Field(i).Name, "Port") { + port := int(compSpec.Field(i).Int()) + // for NgPort, 0 means default and -1 means disable + if compSpec.Type().Field(i).Name == "NgPort" && (port == -1 || port == 0) { + continue + } + if port < 1 || port > 65535 { + portField := strings.Split(compSpec.Type().Field(i).Tag.Get("yaml"), ",")[0] + return errors.Errorf("`%s` of %s=%d is invalid, port should be in the range [1, 65535]", cfg, portField, port) + } + } + } + return nil + } + + for i := 0; i < topoSpec.NumField(); i++ { + compSpecs := topoSpec.Field(i) + + // check on struct + if compSpecs.Kind() == reflect.Struct { + if err := checkPort(i, compSpecs); err != nil { + return err + } + continue + } + + // check on slice + for index := 0; index < compSpecs.Len(); index++ { + compSpec := reflect.Indirect(compSpecs.Index(index)) + if err := checkPort(i, compSpec); err != nil { + return err + } + } + } + + return nil +} + +func (s *Specification) portConflictsDetect() error { + type ( + usedPort struct { + host string + port int + } + conflict struct { + tp string + cfg string + } + ) + + portTypes := []string{ + "Port", + "StatusPort", + "PeerPort", + "ClientPort", + "WebPort", + "TCPPort", + "HTTPPort", + "FlashServicePort", + "FlashProxyPort", + "FlashProxyStatusPort", + "ClusterPort", + "NgPort", + } + + portStats := map[usedPort]conflict{} + uniqueHosts := set.NewStringSet() + topoSpec := reflect.ValueOf(s).Elem() + topoType := reflect.TypeOf(s).Elem() + + for i := 0; i < topoSpec.NumField(); i++ { + if isSkipField(topoSpec.Field(i)) { + continue + } + + compSpecs := topoSpec.Field(i) + for index := 0; index < compSpecs.Len(); index++ { + compSpec := reflect.Indirect(compSpecs.Index(index)) + + // check hostname + host := compSpec.FieldByName("Host").String() + cfg := strings.Split(topoType.Field(i).Tag.Get("yaml"), ",")[0] // without meta + if host == "" { + return errors.Errorf("`%s` contains empty host field", cfg) + } + uniqueHosts.Insert(host) + + // Ports conflicts + for _, portType := range portTypes { + if j, found := findField(compSpec, portType); found { + item := usedPort{ + host: host, + port: int(compSpec.Field(j).Int()), + } + tp := compSpec.Type().Field(j).Tag.Get("yaml") + prev, exist := portStats[item] + if exist { + return &meta.ValidateErr{ + Type: meta.TypeConflict, + Target: "port", + LHS: fmt.Sprintf("%s:%s.%s", prev.cfg, item.host, prev.tp), + RHS: fmt.Sprintf("%s:%s.%s", cfg, item.host, tp), + Value: item.port, + } + } + portStats[item] = conflict{ + tp: tp, + cfg: cfg, + } + } + } + } + } + + return nil +} + +func (s *Specification) dirConflictsDetect() error { + type ( + usedDir struct { + host string + dir string + } + conflict struct { + tp string + cfg string + imported bool + } + ) + + dirTypes := []string{ + "DataDir", + "DeployDir", + } + + // usedInfo => type + var dirStats = map[usedDir]conflict{} + + topoSpec := reflect.ValueOf(s).Elem() + topoType := reflect.TypeOf(s).Elem() + + for i := 0; i < topoSpec.NumField(); i++ { + if isSkipField(topoSpec.Field(i)) { + continue + } + + compSpecs := topoSpec.Field(i) + for index := 0; index < compSpecs.Len(); index++ { + compSpec := reflect.Indirect(compSpecs.Index(index)) + // check hostname + host := compSpec.FieldByName("Host").String() + cfg := strings.Split(topoType.Field(i).Tag.Get("yaml"), ",")[0] // without meta + if host == "" { + return errors.Errorf("`%s` contains empty host field", cfg) + } + + // Directory conflicts + for _, dirType := range dirTypes { + j, found := findField(compSpec, dirType) + if !found { + continue + } + + // `yaml:"data_dir,omitempty"` + tp := strings.Split(compSpec.Type().Field(j).Tag.Get("yaml"), ",")[0] + for _, dir := range strings.Split(compSpec.Field(j).String(), ",") { + dir = strings.TrimSpace(dir) + item := usedDir{ + host: host, + dir: dir, + } + // data_dir is relative to deploy_dir by default, so they can be with + // same (sub) paths as long as the deploy_dirs are different + if item.dir != "" && !strings.HasPrefix(item.dir, "/") { + continue + } + prev, exist := dirStats[item] + // not checking between imported nodes + if exist { + return &meta.ValidateErr{ + Type: meta.TypeConflict, + Target: "directory", + LHS: fmt.Sprintf("%s:%s.%s", prev.cfg, item.host, prev.tp), + RHS: fmt.Sprintf("%s:%s.%s", cfg, item.host, tp), + Value: item.dir, + } + } + // not reporting error for nodes imported from openGemini-Ansible, but keep + // their dirs in the map to check if other nodes are using them + dirStats[item] = conflict{ + tp: tp, + cfg: cfg, + imported: false, + } + } + } + } + } + + return nil } // CountDir counts for dir paths used by any instance in the cluster with the same @@ -56,6 +643,7 @@ func (s *Specification) CountDir(targetHost, dirPrefix string) int { if isSkipField(topoSpec.Field(i)) { continue } + compSpecs := topoSpec.Field(i) for index := 0; index < compSpecs.Len(); index++ { compSpec := reflect.Indirect(compSpecs.Index(index)) @@ -112,8 +700,168 @@ func (s *Specification) CountDir(targetHost, dirPrefix string) int { return count } +func (s *Specification) validateTLSEnabled() error { + if !s.GlobalOptions.TLSEnabled { + return nil + } + + // check for component with no tls support + compList := make([]Component, 0) + s.IterComponent(func(c Component) { + if len(c.Instances()) > 0 { + compList = append(compList, c) + } + }) + + for _, c := range compList { + switch c.Name() { + case ComponentOpenGemini, + ComponentGrafana: + default: + return errors.Errorf("component %s is not supported in TLS enabled cluster", c.Name()) + } + } + return nil +} + +func (s *Specification) validateUserGroup() error { + gOpts := s.GlobalOptions + if user := gOpts.User; !reUser.MatchString(user) { + return errors.WithMessagef(ErrUserOrGroupInvalid, "`global` of user='%s' is invalid", user) + } + // if group is nil, then we'll set it to the same as user + if group := gOpts.Group; group != "" && !reGroup.MatchString(group) { + return errors.WithMessagef(ErrUserOrGroupInvalid, "`global` of group='%s' is invalid", group) + } + return nil +} + +func (s *Specification) validateTSMetaNames() error { + // check ts-meta-server name + metaNames := set.NewStringSet() + for _, m := range s.TSMetaServers { + if m.Name == "" { + continue + } + + if metaNames.Exist(m.Name) { + return errors.Errorf("component ts_meta_servers.name is not supported duplicated, the name %s is duplicated", m.Name) + } + metaNames.Insert(m.Name) + } + return nil +} + +// validateMonitorAgent checks for conflicts in topology for different ignore_exporter +// settings for multiple instances on the same host / IP +// +//lint:ignore U1000 keep this +func (s *Specification) validateMonitorAgent() error { + type ( + conflict struct { + ignore bool + cfg string + } + ) + agentStats := map[string]conflict{} + topoSpec := reflect.ValueOf(s).Elem() + topoType := reflect.TypeOf(s).Elem() + + for i := 0; i < topoSpec.NumField(); i++ { + if isSkipField(topoSpec.Field(i)) { + continue + } + + compSpecs := topoSpec.Field(i) + for index := 0; index < compSpecs.Len(); index++ { + compSpec := reflect.Indirect(compSpecs.Index(index)) + + // check hostname + host := compSpec.FieldByName("Host").String() + cfg := strings.Split(topoType.Field(i).Tag.Get("yaml"), ",")[0] // without meta + if host == "" { + return errors.Errorf("`%s` contains empty host field", cfg) + } + + // agent conflicts + stat := conflict{} + if j, found := findField(compSpec, "IgnoreExporter"); found { + stat.ignore = compSpec.Field(j).Bool() + stat.cfg = cfg + } + + prev, exist := agentStats[host] + if exist { + if prev.ignore != stat.ignore { + return &meta.ValidateErr{ + Type: meta.TypeMismatch, + Target: "ignore_exporter", + LHS: fmt.Sprintf("%s:%v", prev.cfg, prev.ignore), + RHS: fmt.Sprintf("%s:%v", stat.cfg, stat.ignore), + Value: host, + } + } + } + agentStats[host] = stat + } + } + return nil +} + // Validate validates the topology specification and produce error if the // specification invalid (e.g: port conflicts or directory conflicts) func (s *Specification) Validate() error { + validators := []func() error{ + s.validateTLSEnabled, + s.platformConflictsDetect, + s.portInvalidDetect, + s.portConflictsDetect, + s.dirConflictsDetect, + s.validateUserGroup, + s.validateTSMetaNames, + } + + for _, v := range validators { + if err := v(); err != nil { + return errors.WithStack(err) + } + } + + return RelativePathDetect(s, isSkipField) +} + +// RelativePathDetect detect if some specific path is relative path and report error +func RelativePathDetect(topo any, isSkipField func(reflect.Value) bool) error { + pathTypes := []string{ + "ConfigFilePath", + "RuleDir", + "DashboardDir", + } + + topoSpec := reflect.ValueOf(topo).Elem() + + for i := 0; i < topoSpec.NumField(); i++ { + if isSkipField(topoSpec.Field(i)) { + continue + } + + compSpecs := topoSpec.Field(i) + for index := 0; index < compSpecs.Len(); index++ { + compSpec := reflect.Indirect(compSpecs.Index(index)) + + // Relateve path detect + for _, field := range pathTypes { + if j, found := findField(compSpec, field); found { + // `yaml:"xxxx,omitempty"` + fieldName := strings.Split(compSpec.Type().Field(j).Tag.Get("yaml"), ",")[0] + localPath := compSpec.Field(j).String() + if localPath != "" && !strings.HasPrefix(localPath, "/") { + return fmt.Errorf("relative path is not allowed for field %s: %s", fieldName, localPath) + } + } + } + } + } + return nil } diff --git a/pkg/cluster/task/builder.go b/pkg/cluster/task/builder.go index b7eecec..209f721 100644 --- a/pkg/cluster/task/builder.go +++ b/pkg/cluster/task/builder.go @@ -162,6 +162,22 @@ func (b *Builder) CopyComponent(pkgSrc, component, os, arch string, return b } +// MonitoredConfig appends a CopyComponent task to the current task collection +func (b *Builder) MonitoredConfig(clusterName, comp, host string, info *spec.MonitorHostInfo, globResCtl meta.ResourceControl, options *spec.TSMonitoredOptions, deployUser string, tlsEnabled bool, paths meta.DirPaths) *Builder { + b.tasks = append(b.tasks, &MonitoredConfig{ + clusterName: clusterName, + component: comp, + host: host, + info: info, + globResCtl: globResCtl, + options: options, + deployUser: deployUser, + tlsEnabled: tlsEnabled, + paths: paths, + }) + return b +} + // SSHKeyGen appends a SSHKeyGen task to the current task collection func (b *Builder) SSHKeyGen(keypath string) *Builder { b.tasks = append(b.tasks, &SSHKeyGen{ diff --git a/pkg/cluster/task/install_package.go b/pkg/cluster/task/install_package.go index 9320738..1a6e689 100644 --- a/pkg/cluster/task/install_package.go +++ b/pkg/cluster/task/install_package.go @@ -51,7 +51,7 @@ func (c *InstallPackage) Execute(ctx context.Context) error { var cmd string switch c.component { - case spec.ComponentTSMeta, spec.ComponentTSSql, spec.ComponentTSStore, spec.ComponentTSMonitor: + case spec.ComponentTSMeta, spec.ComponentTSSql, spec.ComponentTSStore, spec.ComponentTSMonitor, spec.ComponentTSServer: cmd = fmt.Sprintf(`tar --no-same-owner -zxf %s -C %s --wildcards '*%s' && mv %s/usr/bin/ts-* %s && rm -r %s/usr && rm %s`, dstPath, dstDir, c.component, dstDir, dstDir, dstDir, dstPath) default: cmd = fmt.Sprintf(`tar --no-same-owner -zxf %s -C %s && rm %s`, dstPath, dstDir, dstPath) diff --git a/pkg/cluster/task/monitored_config.go b/pkg/cluster/task/monitored_config.go new file mode 100644 index 0000000..45933a7 --- /dev/null +++ b/pkg/cluster/task/monitored_config.go @@ -0,0 +1,162 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "context" + "fmt" + "path/filepath" + "strings" + + "github.com/google/uuid" + "github.com/openGemini/gemix/pkg/cluster/ctxt" + "github.com/openGemini/gemix/pkg/cluster/spec" + "github.com/openGemini/gemix/pkg/cluster/template" + "github.com/openGemini/gemix/pkg/cluster/template/config" + "github.com/openGemini/gemix/pkg/cluster/template/scripts" + system "github.com/openGemini/gemix/pkg/cluster/template/systemd" + "github.com/openGemini/gemix/pkg/meta" + "github.com/openGemini/gemix/pkg/utils" + "github.com/pkg/errors" +) + +// MonitoredConfig is used to generate the monitor node configuration +type MonitoredConfig struct { + clusterName string + component string + host string + info *spec.MonitorHostInfo + globResCtl meta.ResourceControl + options *spec.TSMonitoredOptions + deployUser string + tlsEnabled bool + paths meta.DirPaths +} + +// Execute implements the Task interface +func (m *MonitoredConfig) Execute(ctx context.Context) error { + // Copy to remote server + exec, found := ctxt.GetInner(ctx).GetExecutor(m.host) + if !found { + return ErrNoExecutor + } + + if err := utils.MkdirAll(m.paths.Cache, 0755); err != nil { + return err + } + + if err := m.syncMonitoredSystemConfig(ctx, exec, m.component); err != nil { + return err + } + + var cfg template.ConfigGenerator + switch m.component { + case spec.ComponentTSMonitor: + monitorConfig := &config.TsMonitorConfig{ + Host: m.host, + MetricPath: m.info.MetricPath, + ErrorLogPath: m.info.ErrorLogPath, + DataPath: m.info.DataPath, + WALPath: m.info.WALPath, + ProcessName: strings.Join(m.info.ProcessName.Slice(), ","), + MonitorAddr: m.info.MonitorAddr, + MonitorDB: strings.Replace(m.clusterName, "-", "_", -1), + TLSEnabled: false, + LoggingPath: m.paths.Log, + } + if err := m.syncTsMonitorConfig(ctx, exec, monitorConfig); err != nil { + return err + } + cfg = &scripts.TSMonitorScript{ + DeployDir: m.paths.Deploy, + LogDir: m.paths.Log, + } + default: + return fmt.Errorf("unknown monitored component %s", m.component) + } + + return m.syncMonitoredScript(ctx, exec, m.component, cfg) +} + +func (m *MonitoredConfig) syncMonitoredSystemConfig(ctx context.Context, exec ctxt.Executor, comp string) (err error) { + sysCfg := filepath.Join(m.paths.Cache, fmt.Sprintf("%s-%s.service", comp, m.host)) + + //// insert checkpoint + //point := checkpoint.Acquire(ctx, spec.CopyConfigFile, map[string]any{"config-file": sysCfg}) + //defer func() { + // point.Release(err, zap.String("config-file", sysCfg)) + //}() + //if point.Hit() != nil { + // return nil + //} + + user := "root" // TODO: use real user + + resource := spec.MergeResourceControl(m.globResCtl, m.globResCtl) + systemCfg := system.NewConfig(comp, user, m.paths.Deploy). + WithMemoryLimit(resource.MemoryLimit). + WithCPUQuota(resource.CPUQuota). + WithLimitCORE(resource.LimitCORE). + WithIOReadBandwidthMax(resource.IOReadBandwidthMax). + WithIOWriteBandwidthMax(resource.IOWriteBandwidthMax) + + if err = systemCfg.ConfigToFile(sysCfg); err != nil { + return errors.WithStack(err) + } + tgt := filepath.Join("/tmp", comp+"_"+uuid.New().String()+".service") + if err = exec.Transfer(ctx, sysCfg, tgt, false, 0, false); err != nil { + return errors.WithMessagef(err, "transfer from %s to %s failed", sysCfg, tgt) + } + serviceFile := fmt.Sprintf("%s.service", comp) + cmd := fmt.Sprintf("mv %s /etc/systemd/system/%s", tgt, serviceFile) + if _, _, err := exec.Execute(ctx, cmd, true); err != nil { + return errors.WithMessagef(err, "execute: %s", cmd) + } + return nil +} + +func (m *MonitoredConfig) syncMonitoredScript(ctx context.Context, exec ctxt.Executor, comp string, cfg template.ConfigGenerator) error { + fp := filepath.Join(m.paths.Cache, fmt.Sprintf("run_%s_%s.sh", comp, m.host)) + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(m.paths.Deploy, "scripts", fmt.Sprintf("run_%s.sh", comp)) + if err := exec.Transfer(ctx, fp, dst, false, 0, false); err != nil { + return err + } + if _, _, err := exec.Execute(ctx, "chmod +x "+dst, false); err != nil { + return err + } + + return nil +} + +func (m *MonitoredConfig) syncTsMonitorConfig(ctx context.Context, exec ctxt.Executor, cfg template.ConfigGenerator) error { + fp := filepath.Join(m.paths.Cache, fmt.Sprintf("ts-monitor_%s.toml", m.host)) + if err := cfg.ConfigToFile(fp); err != nil { + return err + } + dst := filepath.Join(m.paths.Deploy, "conf", "ts-monitor.toml") + return exec.Transfer(ctx, fp, dst, false, 0, false) +} + +// Rollback implements the Task interface +func (m *MonitoredConfig) Rollback(ctx context.Context) error { + return ErrUnsupportedRollback +} + +// String implements the fmt.Stringer interface +func (m *MonitoredConfig) String() string { + return fmt.Sprintf("MonitoredConfig: cluster=%s, user=%s, %v", m.clusterName, m.deployUser, m.paths) +} diff --git a/pkg/cluster/template/config/ts-monitor.go b/pkg/cluster/template/config/ts-monitor.go new file mode 100644 index 0000000..4ba6c06 --- /dev/null +++ b/pkg/cluster/template/config/ts-monitor.go @@ -0,0 +1,71 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "bytes" + "path" + "text/template" + + "github.com/openGemini/gemix/embed" + "github.com/openGemini/gemix/pkg/utils" +) + +// TsMonitorConfig represent the data to generate ts-monitor config +type TsMonitorConfig struct { + Host string + MetricPath string + ErrorLogPath string + DataPath string + WALPath string + ProcessName string + MonitorAddr string + MonitorDB string + TLSEnabled bool + LoggingPath string +} + +// Config generate the config file data. +func (c *TsMonitorConfig) Config() ([]byte, error) { + fp := path.Join("templates", "config", "ts-monitor.toml.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return nil, err + } + return c.ConfigWithTemplate(string(tpl)) +} + +// ConfigToFile write config content to specific path +func (c *TsMonitorConfig) ConfigToFile(file string) error { + config, err := c.Config() + if err != nil { + return err + } + return utils.WriteFile(file, config, 0755) +} + +// ConfigWithTemplate generate the AlertManager config content by tpl +func (c *TsMonitorConfig) ConfigWithTemplate(tpl string) ([]byte, error) { + tmpl, err := template.New("TSMonitorConfig").Parse(tpl) + if err != nil { + return nil, err + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return nil, err + } + + return content.Bytes(), nil +} diff --git a/pkg/cluster/template/scripts/ts_monitor.go b/pkg/cluster/template/scripts/ts_monitor.go index 7726a50..c16909e 100644 --- a/pkg/cluster/template/scripts/ts_monitor.go +++ b/pkg/cluster/template/scripts/ts_monitor.go @@ -29,23 +29,36 @@ type TSMonitorScript struct { LogDir string } -// ConfigToFile write config content to specific path -func (c *TSMonitorScript) ConfigToFile(file string) error { +// Config generate the config file data. +func (c *TSMonitorScript) Config() ([]byte, error) { fp := path.Join("templates", "scripts", "run_ts_monitor.sh.tpl") tpl, err := embed.ReadTemplate(fp) if err != nil { - return errors.WithStack(err) + return nil, errors.WithStack(err) } + return c.ConfigWithTemplate(string(tpl)) +} - tmpl, err := template.New("TSMonitor").Parse(string(tpl)) +// ConfigWithTemplate generate the BlackboxExporter config content by tpl +func (c *TSMonitorScript) ConfigWithTemplate(tpl string) ([]byte, error) { + tmpl, err := template.New("TSMonitorScript").Parse(tpl) if err != nil { - return errors.WithStack(err) + return nil, errors.WithStack(err) } content := bytes.NewBufferString("") - if err := tmpl.Execute(content, c); err != nil { - return errors.WithStack(err) + if err = tmpl.Execute(content, c); err != nil { + return nil, errors.WithStack(err) } - return utils.WriteFile(file, content.Bytes(), 0750) + return content.Bytes(), nil +} + +// ConfigToFile write config content to specific path +func (c *TSMonitorScript) ConfigToFile(file string) error { + config, err := c.Config() + if err != nil { + return errors.WithStack(err) + } + return utils.WriteFile(file, config, 0755) } diff --git a/pkg/cluster/template/scripts/ts_server.go b/pkg/cluster/template/scripts/ts_server.go new file mode 100644 index 0000000..f35940f --- /dev/null +++ b/pkg/cluster/template/scripts/ts_server.go @@ -0,0 +1,51 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scripts + +import ( + "bytes" + "path" + "text/template" + + "github.com/openGemini/gemix/embed" + "github.com/openGemini/gemix/pkg/utils" + "github.com/pkg/errors" +) + +// TSServerScript represent the data to generate ts-server config +type TSServerScript struct { + DeployDir string + LogDir string +} + +// ConfigToFile write config content to specific path +func (c *TSServerScript) ConfigToFile(file string) error { + fp := path.Join("templates", "scripts", "run_ts_server.sh.tpl") + tpl, err := embed.ReadTemplate(fp) + if err != nil { + return errors.WithStack(err) + } + + tmpl, err := template.New("TSServer").Parse(string(tpl)) + if err != nil { + return errors.WithStack(err) + } + + content := bytes.NewBufferString("") + if err := tmpl.Execute(content, c); err != nil { + return errors.WithStack(err) + } + + return utils.WriteFile(file, content.Bytes(), 0750) +} diff --git a/pkg/cluster/template/template.go b/pkg/cluster/template/template.go new file mode 100644 index 0000000..0716046 --- /dev/null +++ b/pkg/cluster/template/template.go @@ -0,0 +1,21 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package template + +// ConfigGenerator is used to generate configuration for component +type ConfigGenerator interface { + Config() ([]byte, error) + ConfigWithTemplate(tpl string) ([]byte, error) + ConfigToFile(file string) error +} diff --git a/pkg/gui/cliutil.go b/pkg/gui/cliutil.go index 22df552..56bad71 100644 --- a/pkg/gui/cliutil.go +++ b/pkg/gui/cliutil.go @@ -14,8 +14,10 @@ package gui import ( + "bytes" "fmt" "strings" + "text/template" "github.com/joomcode/errorx" "github.com/openGemini/gemix/pkg/utils" @@ -28,6 +30,17 @@ var ( errOperationAbort = errNS.NewType("operation_aborted", utils.ErrTraitPreCheck) ) +var templateFuncs = template.FuncMap{ + "OsArgs": func() string { return "install xxx" }, + "OsArgs0": func() string { return "gemix cluster" }, +} + +func init() { + AddColorFunctions(func(name string, f any) { + templateFuncs[name] = f + }) +} + // CheckCommandArgsAndMayPrintHelp checks whether user passes enough number of arguments. // If insufficient number of arguments are passed, an error with proper suggestion will be raised. // When no argument is passed, command help will be printed and no error will be raised. @@ -47,12 +60,28 @@ func CheckCommandArgsAndMayPrintHelp(cmd *cobra.Command, args []string, minArgs return true, nil } +func formatSuggestion(templateStr string, data any) string { + t := template.Must(template.New("suggestion").Funcs(templateFuncs).Parse(templateStr)) + var buf bytes.Buffer + if err := t.Execute(&buf, data); err != nil { + panic(err) + } + return buf.String() +} + // SuggestionFromString creates a suggestion from string. // Usage: SomeErrorX.WithProperty(SuggestionFromString(..)) func SuggestionFromString(str string) (errorx.Property, string) { return utils.ErrPropSuggestion, strings.TrimSpace(str) } +// SuggestionFromTemplate creates a suggestion from go template. Colorize function and some other utilities +// are available. +// Usage: SomeErrorX.WithProperty(SuggestionFromTemplate(..)) +func SuggestionFromTemplate(templateStr string, data any) (errorx.Property, string) { + return SuggestionFromString(formatSuggestion(templateStr, data)) +} + // SuggestionFromFormat creates a suggestion from a format. // Usage: SomeErrorX.WithProperty(SuggestionFromFormat(..)) func SuggestionFromFormat(format string, a ...any) (errorx.Property, string) { diff --git a/pkg/gui/gui.go b/pkg/gui/gui.go index 777828b..00976bc 100644 --- a/pkg/gui/gui.go +++ b/pkg/gui/gui.go @@ -138,7 +138,7 @@ func PromptForPassword(format string, a ...any) string { fmt.Printf(format, a...) - input, err := term.ReadPassword(syscall.Stdin) + input, err := term.ReadPassword(int(syscall.Stdin)) if err != nil { return "" diff --git a/pkg/meta/err.go b/pkg/meta/err.go new file mode 100644 index 0000000..a8f1be6 --- /dev/null +++ b/pkg/meta/err.go @@ -0,0 +1,71 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package meta + +import ( + "fmt" + "reflect" +) + +var ( + // ErrValidate is an empty ValidateErr object, useful for type checking + ErrValidate = &ValidateErr{} +) + +// error types +const ( + TypeConflict = "conflict" + TypeMismatch = "mismatch" +) + +// ValidateErr is the error when meta validation fails with conflicts +type ValidateErr struct { + Type string // conflict type + Target string // conflict Target + Value any // conflict Value + LHS string // object 1 + RHS string // object 2 +} + +// Error implements the error interface +func (e *ValidateErr) Error() string { + return fmt.Sprintf("%s %s for '%v' between '%s' and '%s'", e.Target, e.Type, e.Value, e.LHS, e.RHS) +} + +// Unwrap implements the error interface +func (e *ValidateErr) Unwrap() error { return nil } + +// Is implements the error interface +func (e *ValidateErr) Is(target error) bool { + t, ok := target.(*ValidateErr) + if !ok { + return false + } + + // check for interface Value separately + if e.Value != nil && t.Value != nil && + (!reflect.ValueOf(e.Value).IsValid() && !reflect.ValueOf(t.Value).IsValid()) { + return false + } + // not supporting non-comparable values for now + if e.Value != nil && t.Value != nil && + !(reflect.TypeOf(e.Value).Comparable() && reflect.TypeOf(t.Value).Comparable()) { + return false + } + return (e.Type == t.Type || t.Type == "") && + (e.Target == t.Target || t.Target == "") && + (e.Value == t.Value || t.Value == nil || reflect.ValueOf(t.Value).IsZero()) && + (e.LHS == t.LHS || t.LHS == "") && + (e.RHS == t.RHS || t.RHS == "") +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 0f7f10b..d03fa3c 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -17,6 +17,7 @@ package utils import ( "net" "strconv" + "strings" ) // JoinHostPort return host and port @@ -31,3 +32,12 @@ func Ternary(condition bool, a, b any) any { } return b } + +// ParseHostPort Prase host and port +func ParseHostPort(hostport string) (host, port string) { + colon := strings.LastIndex(hostport, ":") + + host = strings.TrimSuffix(strings.TrimPrefix(hostport[:colon], "["), "]") + port = hostport[colon+1:] + return +}