Skip to content

Commit

Permalink
[DPE-2635] Add support for spark-metrics and fix pod template issue (#45
Browse files Browse the repository at this point in the history
)
  • Loading branch information
welpaolo authored Oct 9, 2023
1 parent 55b1279 commit 21b936f
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 5 deletions.
15 changes: 12 additions & 3 deletions rockcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ services:
parts:
spark:
plugin: dump
source: https://github.com/canonical/central-uploader/releases/download/spark-3.4.1-ubuntu0/spark-3.4.1-ubuntu0-20230914144231-bin-k8s.tgz
source-checksum: sha512/365cc86e1517bf790f19eec3cd29eebe77e2a766b44712cde6cfd1c2b965f54531bf58fee4a374e08580afa80ca3f7973a9dc69f7e4729e3b7684c51d194ba5d
source: https://github.com/canonical/central-uploader/releases/download/spark-3.4.1-ubuntu1/spark-3.4.1-ubuntu1-20231004201041-bin-k8s.tgz
source-checksum: sha512/c229e0e1813873868368733f73138c47c70153e0a612823971e8b6a3ed79358ac3514e552b14c5aa579e1e0feed0dfdc672ab87b1d46e944efedb45ca538d2df
overlay-script: |
sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list
apt-get update
Expand Down Expand Up @@ -93,7 +93,7 @@ parts:
- opt/spark/python
- opt/spark/data

hadoop:
dependencies:
plugin: nil
after: [ spark ]
build-packages:
Expand All @@ -119,11 +119,20 @@ parts:
echo "DOWNLOAD ERROR: hadoop-aws-${HADOOP_AWS_VERSION}.jar could not be downloaded properly! Exiting...." >&2
exit 1
fi
wget -q "https://github.com/canonical/central-uploader/releases/download/spark-metrics-assembly-3.4-1.0.0/spark-metrics-assembly-3.4-1.0.0.jar"
wget -q "https://github.com/canonical/central-uploader/releases/download/spark-metrics-assembly-3.4-1.0.0/spark-metrics-assembly-3.4-1.0.0.jar.sha512"
echo "`cat spark-metrics-assembly-3.4-1.0.0.jar.sha512`" | sha512sum --check
if [[ $? -ne 0 ]]
then
echo "DOWNLOAD ERROR: spark-metrics-assembly-3.4-1.0.0.jar could not be downloaded properly! Exiting...." >&2
exit 1
fi
stage:
- opt/spark/jars

spark8t:
plugin: nil
after: [ dependencies ]
build-packages:
- wget
- ssl-cert
Expand Down
149 changes: 147 additions & 2 deletions tests/integration/integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ validate_pi_value() {
fi
}

validate_metrics() {
log=$1
if [ $(grep -Ri "spark" $log | wc -l) -lt 2 ]; then
exit 1
fi
}

test_restricted_account() {

kubectl config set-context spark-context --namespace=tests --cluster=prod --user=spark
Expand Down Expand Up @@ -112,11 +119,13 @@ setup_test_pod() {
done

MY_KUBE_CONFIG=$(cat /home/${USER}/.kube/config)
TEST_POD_TEMPLATE=$(cat tests/integration/resources/podTemplate.yaml)

kubectl exec testpod -- /bin/bash -c 'mkdir -p ~/.kube'
kubectl exec testpod -- env KCONFIG="$MY_KUBE_CONFIG" /bin/bash -c 'echo "$KCONFIG" > ~/.kube/config'
kubectl exec testpod -- /bin/bash -c 'cat ~/.kube/config'
kubectl exec testpod -- /bin/bash -c 'cp -r /opt/spark/python /var/lib/spark/'
kubectl exec testpod -- env PTEMPLATE="$TEST_POD_TEMPLATE" /bin/bash -c 'echo "$PTEMPLATE" > /etc/spark/conf/podTemplate.yaml'
}

teardown_test_pod() {
Expand All @@ -127,28 +136,70 @@ run_example_job_in_pod() {
SPARK_EXAMPLES_JAR_NAME="spark-examples_2.12-$(get_spark_version).jar"

PREVIOUS_JOB=$(kubectl get pods | grep driver | tail -n 1 | cut -d' ' -f1)

NAMESPACE=$1
USERNAME=$2

kubectl exec testpod -- env UU="$USERNAME" NN="$NAMESPACE" JJ="$SPARK_EXAMPLES_JAR_NAME" IM="$(spark_image)" \
/bin/bash -c 'spark-client.spark-submit \
--username $UU --namespace $NN \
--conf spark.kubernetes.driver.request.cores=100m \
--conf spark.kubernetes.executor.request.cores=100m \
--conf spark.kubernetes.container.image=$IM \
--class org.apache.spark.examples.SparkPi \
local:///opt/spark/examples/jars/$JJ 1000'

# kubectl --kubeconfig=${KUBE_CONFIG} get pods
DRIVER_JOB=$(kubectl get pods -n ${NAMESPACE} | grep driver | tail -n 1 | cut -d' ' -f1)

if [[ "${DRIVER_JOB}" == "${PREVIOUS_JOB}" ]]
then
echo "ERROR: Sample job has not run!"
exit 1
fi

# Check job output
# Sample output
# "Pi is roughly 3.13956232343"
pi=$(kubectl logs $(kubectl get pods -n ${NAMESPACE} | grep driver | tail -n 1 | cut -d' ' -f1) -n ${NAMESPACE} | grep 'Pi is roughly' | rev | cut -d' ' -f1 | rev | cut -c 1-3)
echo -e "Spark Pi Job Output: \n ${pi}"

validate_pi_value $pi
}

run_example_job_in_pod_with_pod_templates() {
SPARK_EXAMPLES_JAR_NAME="spark-examples_2.12-$(get_spark_version).jar"

PREVIOUS_JOB=$(kubectl get pods | grep driver | tail -n 1 | cut -d' ' -f1)

NAMESPACE=$1
USERNAME=$2
kubectl exec testpod -- env UU="$USERNAME" NN="$NAMESPACE" JJ="$SPARK_EXAMPLES_JAR_NAME" IM="$(spark_image)" \
/bin/bash -c 'spark-client.spark-submit \
--username $UU --namespace $NN \
--conf spark.kubernetes.driver.request.cores=100m \
--conf spark.kubernetes.executor.request.cores=100m \
--conf spark.kubernetes.container.image=$IM \
--conf spark.kubernetes.driver.podTemplateFile=/etc/spark/conf/podTemplate.yaml \
--conf spark.kubernetes.executor.podTemplateFile=/etc/spark/conf/podTemplate.yaml \
--class org.apache.spark.examples.SparkPi \
local:///opt/spark/examples/jars/$JJ 100'

# kubectl --kubeconfig=${KUBE_CONFIG} get pods
DRIVER_JOB=$(kubectl get pods -n ${NAMESPACE} | grep driver | tail -n 1 | cut -d' ' -f1)
echo "DRIVER JOB: $DRIVER_JOB"

if [[ "${DRIVER_JOB}" == "${PREVIOUS_JOB}" ]]
then
echo "ERROR: Sample job has not run!"
exit 1
fi
DRIVER_JOB_LABEL=$(kubectl get pods -n ${NAMESPACE} -lproduct=charmed-spark | grep driver | tail -n 1 | cut -d' ' -f1)
echo "DRIVER JOB_LABEL: $DRIVER_JOB_LABEL"
if [[ "${DRIVER_JOB}" != "${DRIVER_JOB_LABEL}" ]]
then
echo "ERROR: Label not present... Error in the application of the template!"
exit 1
fi

# Check job output
# Sample output
Expand All @@ -159,10 +210,69 @@ run_example_job_in_pod() {
validate_pi_value $pi
}


run_example_job_in_pod_with_metrics() {
SPARK_EXAMPLES_JAR_NAME="spark-examples_2.12-$(get_spark_version).jar"
LOG_FILE="/tmp/server.log"
SERVER_PORT=9091
PREVIOUS_JOB=$(kubectl get pods | grep driver | tail -n 1 | cut -d' ' -f1)
# start simple http server
python3 tests/integration/resources/test_web_server.py $SERVER_PORT > $LOG_FILE &
HTTP_SERVER_PID=$!
# get ip address
IP_ADDRESS=$(hostname -I | cut -d ' ' -f 1)
echo "IP: $IP_ADDRESS"
NAMESPACE=$1
USERNAME=$2
kubectl exec testpod -- env PORT="$SERVER_PORT" IP="$IP_ADDRESS" UU="$USERNAME" NN="$NAMESPACE" JJ="$SPARK_EXAMPLES_JAR_NAME" IM="$(spark_image)" \
/bin/bash -c 'spark-client.spark-submit \
--username $UU --namespace $NN \
--conf spark.kubernetes.driver.request.cores=100m \
--conf spark.kubernetes.executor.request.cores=100m \
--conf spark.kubernetes.container.image=$IM \
--conf spark.metrics.conf.*.sink.prometheus.pushgateway-address="$IP:$PORT" \
--conf spark.metrics.conf.*.sink.prometheus.class=org.apache.spark.banzaicloud.metrics.sink.PrometheusSink \
--class org.apache.spark.examples.SparkPi \
local:///opt/spark/examples/jars/$JJ 1000'

# kubectl --kubeconfig=${KUBE_CONFIG} get pods
DRIVER_JOB=$(kubectl get pods -n ${NAMESPACE} | grep driver | tail -n 1 | cut -d' ' -f1)

if [[ "${DRIVER_JOB}" == "${PREVIOUS_JOB}" ]]
then
echo "ERROR: Sample job has not run!"
exit 1
fi

# Check job output
# Sample output
# "Pi is roughly 3.13956232343"
pi=$(kubectl logs $(kubectl get pods -n ${NAMESPACE} | grep driver | tail -n 1 | cut -d' ' -f1) -n ${NAMESPACE} | grep 'Pi is roughly' | rev | cut -d' ' -f1 | rev | cut -c 1-3)
echo -e "Spark Pi Job Output: \n ${pi}"

validate_pi_value $pi
# check that metrics are sent and stop the http server
echo "Number of POST done: $(wc -l $LOG_FILE)"
# kill http server
kill $HTTP_SERVER_PID
validate_metrics $LOG_FILE
}


test_example_job_in_pod_with_templates() {
run_example_job_in_pod_with_pod_templates tests spark
}


test_example_job_in_pod() {
run_example_job_in_pod tests spark
}

test_example_job_in_pod_with_metrics() {
run_example_job_in_pod_with_metrics tests spark
}



run_spark_shell_in_pod() {
echo "run_spark_shell_in_pod ${1} ${2}"
Expand Down Expand Up @@ -225,13 +335,48 @@ cleanup_user_failure_in_pod() {
cleanup_user_failure
}

echo -e "##################################"
echo -e "SETUP TEST POD"
echo -e "##################################"

setup_test_pod

echo -e "##################################"
echo -e "RUN EXAMPLE JOB"
echo -e "##################################"

(setup_user_admin_context && test_example_job_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod

echo -e "##################################"
echo -e "RUN SPARK SHELL IN POD"
echo -e "##################################"

(setup_user_admin_context && test_spark_shell_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod

echo -e "##################################"
echo -e "RUN PYSPARK IN POD"
echo -e "##################################"

(setup_user_admin_context && test_pyspark_in_pod && cleanup_user_success) || cleanup_user_failure_in_pod

teardown_test_pod
echo -e "##################################"
echo -e "RUN EXAMPLE JOB WITH POD TEMPLATE"
echo -e "##################################"

(setup_user_admin_context && test_example_job_in_pod_with_templates && cleanup_user_success) || cleanup_user_failure_in_pod

echo -e "########################################"
echo -e "RUN EXAMPLE JOB WITH PROMETHEUS METRICS"
echo -e "########################################"

(setup_user_admin_context && test_example_job_in_pod_with_metrics && cleanup_user_success) || cleanup_user_failure_in_pod

echo -e "##################################"
echo -e "TEARDOWN TEST POD"
echo -e "##################################"

teardown_test_pod

echo -e "##################################"
echo -e "END OF THE TEST"
echo -e "##################################"
10 changes: 10 additions & 0 deletions tests/integration/resources/podTemplate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: Pod
metadata:
name: label-test
labels:
product: charmed-spark
spec:
containers:
- name: spark

58 changes: 58 additions & 0 deletions tests/integration/resources/test_web_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python
"""
Very simple HTTP server in python.
Usage::
./dummy-web-server.py [<port>]
Send a GET request::
curl http://localhost
Send a HEAD request::
curl -I http://localhost
Send a POST request::
curl -d "foo=bar&bin=baz" http://localhost
"""
import sys

if sys.version_info[0] < 3:
# python 2 import
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
else:
# python 3 import
from http.server import BaseHTTPRequestHandler, HTTPServer

class BaseServer(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()

def do_GET(self):
self._set_headers()
print("<html><body><p>hello, world!</p></body></html>".encode('utf-8'))

def do_HEAD(self):
self._set_headers()

def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self._set_headers()
print("%s".encode('utf-8') % post_data)

def run(server_class=HTTPServer, handler_class=BaseServer, port=9091):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print('HTTP server running on port %s'% port)
httpd.serve_forever()

if __name__ == "__main__":
from sys import argv

if len(argv) == 2:
run(port=int(argv[1]))
else:
run()

0 comments on commit 21b936f

Please sign in to comment.