Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Cassandra Storage in v3 #3570

Merged
merged 11 commits into from
Oct 30, 2023
Merged

Conversation

mrproliu
Copy link
Contributor

@mrproliu mrproliu commented Oct 7, 2023

Reimplementing Cassandra Storage in Zipkin Using SkyWalking OAP Core.

Data Tables

  1. Table Creation: In the previous version, tables were created using predefined CQL files. Now, they would be dynamically created based on specific Java files (automatically checked and created when Zipkin startup).
  2. Table Names and Fields: The tables have changed due to differences in implementation principles with the original Zipkin. The corresponding relationships are as follows:
    • span: ZipkinSpanRecord.java
    • trace_by_service_span: Utilizes the Span table.
    • trace_by_service_remote_service: Utilizes the Span table.
    • span_by_service: ZipkinServiceSpanTraffic.java
    • remote_service_by_service: Utilizes the span_by_service table.
    • autocomplete_tags: Utilizes the additional table in Span, as shown here.
  3. Field Types: Previously, custom data types were used to store endpoint and annotation information. Now, these would be converted into text and stored across multiple fields.
  4. Table Storage: Previously, all data were stored in their respective standalone tables. Now, tables are created on a daily basis for data insertion and querying.

Configuration

  1. TTL (Time To Live): In the previous version of Zipkin, the default_time_to_live field was predefined in the tables to allow the database to delete data. In the new version, data would be periodically cleared from database tables in Zipkin according to the DATA_TTL configuration.
  2. Auto Complete Cardinality: This configuration has been removed in the new version, and no longer has any restrictions.

@mrproliu mrproliu added cassandra storage Group label for Storage components labels Oct 7, 2023
@mrproliu mrproliu added this to the 3.0.0 milestone Oct 7, 2023
@mrproliu mrproliu requested a review from wu-sheng October 7, 2023 03:03
@wu-sheng
Copy link
Member

wu-sheng commented Oct 7, 2023

Let's put some summary about the feature here before the review.

@anuraaga
Copy link
Contributor

anuraaga commented Oct 7, 2023

I haven't been following that well but skimmed the description and wanted to ask, if the tables change significantly as I think they did, it seems like a big migration is needed by users. If a migration would be needed, is there a reason to develop Zipkin v3 vs asking users to switch to skywalking oap? I reread JC's email and Bas's issue comment and don't think I saw that addressed.

@wu-sheng
Copy link
Member

wu-sheng commented Oct 7, 2023

I haven't been following that well but skimmed the description and wanted to ask, if the tables change significantly as I think they did, it seems like a big migration is needed by users. If a migration would be needed, is there a reason to develop Zipkin v3 vs asking users to switch to skywalking oap? I reread JC's email and Bas's issue comment and don't think I saw that addressed.

I think there was a discussion on the Zipkin admin mail list(I am not in that list).

About immigration, I think the general idea is not to keep the DB schema unchanged, we have new implementation in the OAP for MySQL/PostgreSQL/Elasticsearch/OpenSearch. And Cassandra is following the abstract.

@anuraaga
Copy link
Contributor

anuraaga commented Oct 7, 2023

About immigration, I think the general idea is not to keep the DB schema unchanged,

Thanks - indeed if this is the general idea, I wonder if all of these schema changes will be accompanied by some auto migration logic. If so, it's probably better to start by writing the migration logic before the server logic since it's always possible to run into impossible migrations.

If not, I still wonder, what will Zipkin v3 offer over switching to Skywalker for Zipkin users? I think it could be helpful to have a bullet list for that.

@jcchavezs
Copy link
Contributor

jcchavezs commented Oct 7, 2023 via email

@wu-sheng
Copy link
Member

wu-sheng commented Oct 7, 2023

Personally, I would say this kind of immigration is not worth so much effect. Because, the TTL of spans is short, such as days. In some cases, the data would lost the value before the immigration is done.

The reason for different schema designs in SkyWalking, is from what we learned from large-scale deployment. As SkyWalking agent + OAP usually work for 100% sampling. We created more concepts to help query(including SkyWalking UI and Lens UI) to have better responses.
As JC said, this is not unchangeable. We haven't began to gather feedback as currently, we are only working on this branch rather than main branch, and finish the first preview version, which makes everything work.

If not, I still wonder, what will Zipkin v3 offer over switching to Skywalker for Zipkin users? I think it could be helpful to have a bullet list for that.

This PR is the value. SkyWalking as an APM is impossible to work with Cassandra successfully, but Zipkin server can.
Another thing like GraalVM support. SkyWalking OAP would have a long way(one year, even longer) to support GraalVM. But here as Zipkin v3, this should be able to support soon. OAP has many analysis relative things, which is hard to immigrate to run on GraalVM.

One of the key point is Zipkin VS SkyWalking OAP is that, OAP is too complex for Zipkin users. It support all cases for an observability team, but Zipkin v3 could be much lighter, more stable and focus on tracing relative things. Meanwhile, we could share the experiences on both sides.

@anuraaga
Copy link
Contributor

anuraaga commented Oct 7, 2023

write to the two servers and progressively start moving to use new server.

Yeah I assume that new server for new data, old server for old until deletion will be the practical approach vs trying any migration scripts.

So I'm still wondering why is this Zipkin? From what I follow in this thread, skywalking being more than tracing can actually impose limitations such as not working with Cassandra. That also seems weird to me though because if skywalking can't support Cassandra, there should be no need to change the Zipkin data model on Cassandra. If it's intended to be tracing-only Skywalking, isn't it better to still be part of Skywalking?

@wu-sheng
Copy link
Member

wu-sheng commented Oct 7, 2023

So I'm still wondering why is this Zipkin? From what I follow in this thread, skywalking being more than tracing can actually impose limitations such as not working with Cassandra. That also seems weird to me though because if skywalking can't support Cassandra, there should be no need to change the Zipkin data model on Cassandra. If it's intended to be tracing-only Skywalking, isn't it better to still be part of Skywalking?

Whether SkyWalking community likes this, it would take a long time to make another decision from a totally another perspective. Zipkin server has this storage option, let's keep it supported.

And, I think more discussion could be made in another place.

Let's keep this PR context clear to the Cassandra itself.

@wu-sheng
Copy link
Member

wu-sheng commented Oct 7, 2023

@mrproliu Please sync and avoid the conflicts as the bugs have been fixed through another PR.

@wu-sheng
Copy link
Member

wu-sheng commented Oct 7, 2023

Let's wait for @michaelsembwever back from vacation and do the review about Cassandra codes and table structure. He knows Cassandra much better than I could ever be.


@Override
public void savePolicy(ContinuousProfilingPolicy policy) throws IOException {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create a NotImplementedException? And throw from here?
It is better to see a clear exception rather than NPE or other strange behavior as there is nothing executed.
After all, these methods should not be called.

@michaelsembwever
Copy link
Member

In the previous version, tables were created using predefined CQL files.

Not having these makes it much harder to review the patch, and to benchmark it.
This needs the benchmark profiles so to be able to validate the scalability and superior performance characteristics of a Cassandra backend. For example in master this is traces-stress.yaml and and trace_by_service_span-stress.yaml and span_by_service-stress.yaml files.

if (keyspaceMetadata == null) {
String createKeyspaceCql = String.format(
"CREATE KEYSPACE IF NOT EXISTS %s " +
"WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neither SimpleStrategy nor rf=1 is appropriate for any production system. NetworkTopologyStrategy and rf=3 is the minimum for production.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the default settings in the Zipkin CQL file, should I updated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are appropriate for first-time / dev usage. but not appropriate for production usage.

it needs to be configurable.
and ideally the application should log a warning when rf<3 saying it's not appropriate for production.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if rf<3 would be got a warning log. What do you mean configurable?

columnDefinitions.add("PRIMARY KEY (" + ID_COLUMN + (CollectionUtils.isEmpty(shardKeys) ? "" : "," + Joiner.on(", ").join(shardKeys)) + ")");
}

final SQLBuilder sql = new SQLBuilder("CREATE TABLE IF NOT EXISTS " + table)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we using jdbc.SQLBuilder here? the java-driver used natively will be safer and cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted.

import java.util.ArrayList;
import java.util.List;

public class BatchCQLExecutor implements InsertRequest, UpdateRequest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't do batch statements. and don't store then flush writes. this is misunderstanding C*'s design and strengths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I have divided each CQL query for execution.


private final Map<String, Long> lastDeletedTimeBucket = new ConcurrentHashMap<>();

public CassandraHistoryDeleteDAO(CassandraClient client, TableHelper tableHelper, CassandraTableInstaller modelInstaller, Clock clock) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are bucketing by table do we still need TimeWindowCompactionStrategy?

what is the performance impact of daily creating and dropping tables ?

The correct (best performance and availability) design here is to define the time bucket inside the one table in its primary key, define the TTL at the table/write level, and let TimeWindowCompactionStrategy do what it's best at. Here it feels like we are re-inventing the wheel at the application layer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have updated to use the original Cassandra table settings, nothing more.

completionTraceIds.add(client.executeAsyncQuery("select " + ZipkinSpanRecord.TRACE_ID + " from " + tagTable +
" where " + ZipkinSpanRecord.QUERY + " = ?" +
" and " + ZipkinSpanRecord.TIME_BUCKET + " >= ?" +
" and " + ZipkinSpanRecord.TIME_BUCKET + " <= ? ALLOW FILTERING",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the primary key specified in the where clause here?
allow filtering without the primary key specified is a full-table scan and will not work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated to use of the latest secondary table in the v2 for searching.

final List<List<Span>> result = new ArrayList<>();
for (String table : tableHelper.getTablesWithinTTL(ZipkinSpanRecord.INDEX_NAME)) {
final PreparedStatement stmt = client.getSession().prepare("select * from " + table + " where " +
ZipkinSpanRecord.TRACE_ID + " in ?");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use IN against. the primary key. async concurrent requests instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to using multiple equals query.

Comment on lines 133 to 135
if (!indexExists(cassandraClient, table, index)) {
executeSQL(
new SQLBuilder("CREATE INDEX ")
Copy link
Member

@michaelsembwever michaelsembwever Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Avoid indexes, especially legacy 2i. (v2 uses SASI, if you must better is to wait for 5.0 and use SAI, at minimum create a design that can still run without the index – true scalability as well as availability when an index needs to be rebuilt)
  2. indexExists(…) does not work in a distributed environment. it can be used as an optimisation but cannot be relied upon. use IF NOT EXISTS. use the optimisation everywhere though (CREATE TABLE too).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid indexes, especially legacy 2i. (v2 uses SASI, if you must better is to wait for 5.0 and use SAI, at minimum create a design that can still run without the index – true scalability as well as availability when an index needs to be rebuilt)

Updated, still using the SASI index for now. I could update the SAI index when Cassandra v5.0 is ready.

indexExists(…) does not work in a distributed environment. it can be used as an optimisation but cannot be relied upon. use IF NOT EXISTS. use the optimisation everywhere though (CREATE TABLE too).

Update to using the pre-defined table CQL file.

List<StorageData> storageDataList = new ArrayList<>();

for (String table : modelTables) {
final SQLBuilder sql = new SQLBuilder("SELECT * FROM " + table + " WHERE id in ")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the java-driver natively. and don't use IN against the primary key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I have divided each CQL query for execution.

duration.getEndTimeBucket()
)) {
final SQLAndParameters sqlAndParameters = buildSQLForQueryValues(tagType, tagKey, limit, duration, table);
results.addAll(client.executeQuery(sqlAndParameters.sql() + " ALLOW FILTERING",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the primary key specified in the where clause here?
allow filtering without the primary key specified is a full-table scan and will not work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the ALLOW FILTERING has been deleted.

)) {
final SQLAndParameters sqlAndParameters = buildSQLForQueryKeys(tagType, Integer.MAX_VALUE, duration, table);
results.addAll(client.executeQuery(sqlAndParameters.sql().replaceAll("(1=1\\s+and)|(distinct)", "") + " ALLOW FILTERING",
row -> row.getString(TagAutocompleteData.TAG_KEY), sqlAndParameters.parameters()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the primary key specified in the where clause here?
allow filtering without the primary key specified is a full-table scan and will not work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the ALLOW FILTERING has been deleted.

@michaelsembwever
Copy link
Member

michaelsembwever commented Oct 11, 2023

To summarise

  • Cassandra storage is for deployments that scale. It can and should be designed to
    -- be as performant as, if not more than, other storage choices at no/low scale (bc C* is the preferred/only available db)
    -- work where the other storage won't
    -- scale linearly, so operators know it will continue to work as load increases, and costs over scale are predictable
    -- to validate any of this we need the benchmark profiles
  • batch statements, and storing and flushing of writes, should not be done
  • time bucketing at the primary key, and TTL in the table/write, is superior (and what TWCS is designed for)
  • allow filtering used without specifying the primary key must be avoided
  • using the IN clause against the primary key must be avoided
  • legacy secondary indexes must be avoided
  • SASI indexes can be optionally used, but the application needs to work (degraded mode) without them
  • C* 5.0 offers SAI indexes, superior to SASI, ok to use (but still try to design without them)
  • keyspace replication config must be configuration (and not be Simple nor rf<3 in any production)
  • you cannot rely on read+write coordination at the application layer: e.g. indexExists(…)

If you have integration tests, you can also rely upon C* server-side guardrails to check you're designing and using correctly. (Such a guardrails definition would also be valuable advice to production operators.)

@wu-sheng
Copy link
Member

@michaelsembwever Could you review for another round?

Copy link
Member

@wu-sheng wu-sheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelsembwever I am going to merge this first, to unblock the Cassandra relative changes at the dependency repo side.

When you have time, feel free to review, and we will follow your recommendations.

@wu-sheng wu-sheng merged commit 04feb90 into openzipkin:zipkin-v3 Oct 30, 2023
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cassandra storage Group label for Storage components
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants