forked from futurewei-cloud/caerus-tpch-spark
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tpch-dike.txt
128 lines (106 loc) · 7.59 KB
/
tpch-dike.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
Run Caerus data source on tpch data with minio, spark, modified version of spark-select
1. Set up system with minio, spark, spark-select by following:
https://github.com/futurewei-cloud/caerus/blob/master/ndp/minio/minio-spark-select.docx
2. Generate 10GB tpch data by following:
https://github.com/futurewei-cloud/caerus/tree/master/tpc_benchmark/tpch
3. Copy tpch data into minio file system
mc cp -r *.tbl tpch/data/
4. Confirm with Minio GUI:
http://localhost:9000/minio/tpch/data (IP:9000)
5. Run spark-shell
root@ubuntu1804:/home/ubuntu/caerus/caerus/ndp/minio/spark-select# spark-shell --jars target/scala-2.11/spark-select-assembly-2.1.jar --conf spark.hadoop.fs.s3a.endpoint=http://localhost:9000 --conf spark.hadoop.fs.s3a.access.key=minioadmin --conf spark.hadoop.fs.s3a.secret.key=minioadmin --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.path.style.access=true
6. Import this scala code to set up schema (copy full text or spark load file) under spark-shell:
https://github.com/futurewei-cloud/caerus/blob/master/tpc_benchmark/tpch/TPCH_ON_SPARK/src/main/scala/TpchSchemaProvider.scala
7. Test if the tpch data files are ready:
scala> val schemaProvider = new TpchSchemaProvider(sc, "s3a://tpch/data/")
schemaProvider: TpchSchemaProvider = TpchSchemaProvider@2e47326b
scala> var df_supplier = schemaProvider.supplier
df_supplier: org.apache.spark.sql.DataFrame = [s_suppkey: int, s_name: string ... 5 more fields]
scala> println(df_supplier.show())
+---------+------------------+--------------------+-----------+---------------+---------+--------------------+
|s_suppkey| s_name| s_address|s_nationkey| s_phone|s_acctbal| s_comment|
+---------+------------------+--------------------+-----------+---------------+---------+--------------------+
| 1|Supplier#000000001|N kD4on9OM Ipw3,g...| 17|27-918-335-1736| 5755.94|each slyly above ...|
| 2|Supplier#000000002|89eJ5ksX3ImxJQBvx...| 5|15-679-861-2259| 4032.68|slyly bold instru...|
….
only showing top 20 rows
()
8. Test generic spark sql
1357 var df_customer = schemaProvider.customer
1358 println(df_customer.show())
1359 var sqlDF = spark.sql("SELECT c_nationkey, sum(c_acctbal) FROM customer GROUP BY c_nationkey")
1360 df_customer.createOrReplaceTempView("customer")
1361 println(sqlDF.show())
scala> println(sqlDF.show())
+-----------+--------------------+
|c_nationkey| sum(c_acctbal)|
+-----------+--------------------+
| 12|2.6934560723000014E8|
| 22|2.7077060241999966E8|
| 1| 2.684459015099997E8|
| 13|2.6986323163000005E8|
| 6| 2.7044158836E8|
| 16|2.6998903620999986E8|
| 3|2.6934730330999994E8|
| 20|2.6967652057999986E8|
| 5|2.7285513239000005E8|
| 19|2.6919152659000033E8|
| 15|2.6951752347999996E8|
| 17| 2.680707190399999E8|
| 9| 2.714341919199999E8|
| 4|2.6920711503000003E8|
| 8| 2.713024830399998E8|
| 23| 2.7068088397E8|
| 7| 2.699917798900002E8|
| 10|2.7118925304000014E8|
| 24| 2.709942146900003E8|
| 21| 2.697743783399999E8|
+-----------+--------------------+
only showing top 20 rows
9. Set up breakpoint in Mino source code in vscode use this launch.json
root@ubuntu1804:/home/ubuntu/minio/minio# cat /home/ubuntu/minio/.vscode/launch.json
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Attach to local process",
"type": "go",
"request": "launch",
"program": "${workspaceFolder}/minio",
"args": ["server", "/data"]
}
]
}root@ubuntu1804:/home/ubuntu/minio/minio#
10. Run Caerus data source: There are might be better ways to do this, for example, the tbl data don’t need to change to csv,
or the schema in the tpch provider can be reused etc. but we just want to move quickly to minio server side implementation, so use the following instead:
a. (only need do this once) Transform *.tbl file into *.csv file: e.g.
// change .tbl to .csv
scala> val schemaProvider = new TpchSchemaProvider(sc, "s3a://tpch/data/")
schemaProvider: TpchSchemaProvider = TpchSchemaProvider@660d8b16
scala> var df_customer = schemaProvider.customer
df_customer: org.apache.spark.sql.DataFrame = [c_custkey: bigint, c_name: string ... 6 more fields]
Note: Need generate one single csv file using following commands, otherwise partitioned csv folder will cause minio error (range error):
scala> df_customer.coalesce(1).write.format("csv").option("header", true).mode("overwrite").option("sep",",").save("s3a://tpch/data/customer.csv")
root@ubuntu1804:/data/tpch/data# mv /data/tpch/data/customer.csv/part-00000-7e3bbd0c-7913-4df6-9288-bc458313f65a-c000.csv /data/tpch/data/customer.csv
b. Redefine the schema:
// define schema
scala> val customer_schema = StructType(List(StructField("c_custkey", LongType, true),StructField("c_name", StringType, false), StructField("c_address", StringType, false), StructField("c_nationkey", LongType, false), StructField("c_phone", StringType, false), StructField("c_acctbal", DoubleType, false), StructField("c_mktsegment", StringType, false), StructField("c_comment", StringType, false)))
customer_schema: org.apache.spark.sql.types.StructType = StructType(StructField(c_custkey,LongType,true), StructField(c_name,StringType,false), StructField(c_address,StringType,false), StructField(c_nationkey,LongType,false), StructField(c_phone,StringType,false), StructField(c_acctbal,DoubleType,false), StructField(c_mktsegment,StringType,false), StructField(c_comment,StringType,false))
c. Send sql expression via option() to minio server side:
// sql expression
scala> val df_customer_sql = spark.read.format("CaerusSelectCSV").schema(customer_schema).option("sqlExpression", " SELECT c_nationkey, sum(c_acctbal) FROM S3object customer GROUP BY c_nationkey").load("s3a://tpch/data/customer.csv")
df_customer_sql: org.apache.spark.sql.DataFrame = [c_custkey: bigint, c_name: string ... 6 more fields]
scala> df_customer_sql.explain()
Note: df_customer_sql.explain() or println(df_customer_sql.show()) should send aggregation request to minio server side, that the bearking points of the minio code should be caught and live as showed in next step.
11. The breaking point should work in Minio, similar to this:
scala> df_customer_sql.explain()
com.amazonaws.services.s3.model.AmazonS3Exception: " (Service: Amazon S3; Status Code: 400; Error Code: ParseSelectFailure; Request ID: 163384F9F948725F; S3 Extended Request ID: f1855934-8aaa-4599-877e-3cb321f5c6cd)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1742)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1371)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1347)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1127)
... 53 elided
scala>