Skip to content

Commit

Permalink
small edit on sql
Browse files Browse the repository at this point in the history
  • Loading branch information
jbcodeforce committed Nov 11, 2024
1 parent cf5972f commit 6f6b56e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
1 change: 0 additions & 1 deletion docs/architecture/flink-sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ Note that the SQL Client executes each INSERT INTO statement as a separate Flink

In streaming, the "ORDER BY" statement applies only to timestamps in ascending order, while in batch processing, it can be applied to any record field.


### Data lifecycle

In a pure Kafka integration architecture, such as Confluent Cloud, the data lifecycle follows these steps:
Expand Down
21 changes: 19 additions & 2 deletions docs/coding/flink-sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Use one of the following approaches:
USE `marketplace`;
SHOW TABLES;
SHOW JOBS;

DESCRIBE tablename;
```

* Write SQL statements and test them with Java SQL runner. The Class is in [https://github.com/jbcodeforce/flink-studies/tree/master/flink-java/sql-runner](https://github.com/jbcodeforce/flink-studies/tree/master/flink-java/sql-runner) folder.
Expand All @@ -67,7 +67,7 @@ Data Definition Language (DDL) are statements to define metadata in Flink SQL by
);
```

???- info "how to join two tables on key within time and store in target table in SQL?"
???- info "how to join two tables on a key within a time window and store results in target table?"
```sql
create table Transactions (ts TIMESTAMP(3), tid BIGINT, amount INT);
create table Payments (ts TIMESTAMP(3), tid BIGINT, type STRING);
Expand Down Expand Up @@ -166,8 +166,19 @@ Data Definition Language (DDL) are statements to define metadata in Flink SQL by
alter table flight_schedules add(dt string);
```

???- question "Create a table as another by inserting record from another table with similar schema - select (CTAS)"
By using a primary key:

```sql
create table shoe_customer_keyed(
primary key(id) not enforced
) distributed by(id) into 1 buckets
as select id, first_name, last_name, email from shoe_customers;
```

??? - question "How to generate data using [Flink Faker](https://github.com/knaufk/flink-faker)?"
Create at table with records generated with `faker` connector using the [DataFaker expressions.](https://github.com/datafaker-net/datafaker).
Valid only on OSS Flink or on-premises.

```sql
CREATE TABLE `bounded_pageviews` (
Expand All @@ -188,6 +199,12 @@ Data Definition Language (DDL) are statements to define metadata in Flink SQL by
```
This will only work in customized flink client with the jar from flink faker.

???- info "Generate data with dataGen for Flink OSS"
[Use DataGen to do in-memory data generation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/datagen/)

???- question "How to generate test data to Confluent Cloud Flink?"
Use Kafka Connector with DataGen. Those connector exists with a lot of different pre-defined model. Also it is possible to define custom Avro schema and then use predicates to generate data. There is a [Produce sample data quick start tutorial from the Confluent Cloud home page](https://docs.confluent.io/cloud/current/connectors/cc-datagen-source.html). See also [this readme](2933https://github.com/jbcodeforce/flink-studies/tree/master/flink-sql/01-confluent-kafka-local-flink).

???- question "How to transfer the source timestamp to another table"
As $rowtime is the timestamp of the record in Kafka, it may be interesting to keep the source timestamp to the downstream topic.

Expand Down
6 changes: 3 additions & 3 deletions flink-sql/01-confluent-kafka-local-flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ confluent kafka topic create pageviews --cluster <cluster-id>
confluent kafka cluster describe <cluster-id>
```

* Use the local same local docker compose with just task manager, job manager and SQL client containers
* Use the same local docker compose with just task manager, job manager and SQL client containers
* Create a table to connect to Kafka change the attributes with API_KEY, API_SECRETS and BOOTSTRAP_SERVER

```sql
Expand All @@ -104,7 +104,7 @@ CREATE TABLE pageviews_kafka (
);
```

* Add a table to generate records
* Add a table to generate records using the Flink sql client

```sql
CREATE TABLE `pageviews` (
Expand All @@ -131,5 +131,5 @@ INSERT INTO pageviews_kafka SELECT * FROM pageviews;

### Problems

10/08/24 the module org.apache.kafka.common.security.plain.PlainLoginModule is missing in job and task managers. We need to add libraries some how verify if these are the good paths in the dockerfile of sql-client. This is not aligned with https://github.com/confluentinc/learn-apache-flink-101-exercises/blob/master/sql-client/Dockerfile
10/08/24 the module org.apache.kafka.common.security.plain.PlainLoginModule is missing in job and task managers. We need to add libraries. Verify if these are the good paths in the dockerfile of sql-client. This is not aligned with https://github.com/confluentinc/learn-apache-flink-101-exercises/blob/master/sql-client/Dockerfile

0 comments on commit 6f6b56e

Please sign in to comment.