Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Flink job modify_time change with duration bug fixed #3188 #3211

Merged
merged 3 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ git clone git@github.com:apache/incubator-streampark.git
cd incubator-streampark
./build.sh
```
🗄️Deatils:how to [Local Development and Debugging](https://streampark.apache.org/docs/user-guide/local%20development%20and%20debugging)

🗄 Deatils:how to [Development](https://streampark.apache.org/docs/user-guide/development)
## 🧑‍💻 Downloads

Download address for run-directly software package : [https://streampark.apache.org/download](https://streampark.apache.org/download)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ object PropertiesUtils extends Logger {
buffer.toString()
}

private[this] def eachAppendYamlItem(
prefix: String,
private[this] def eachYamlItem(
k: String,
v: Any,
proper: collection.mutable.Map[String, String]): Map[String, String] = {
prefix: String = "",
proper: MutableMap[String, String] = MutableMap[String, String]()): Map[String, String] = {
v match {
case map: JavaLinkedMap[String, Any] =>
map
.flatMap(
x => {
prefix match {
case "" => eachAppendYamlItem(k, x._1, x._2, proper)
case other => eachAppendYamlItem(s"$other.$k", x._1, x._2, proper)
case "" => eachYamlItem(x._1, x._2, k, proper)
case other => eachYamlItem(x._1, x._2, s"$other.$k", proper)
}
})
.toMap
Expand All @@ -84,11 +84,10 @@ object PropertiesUtils extends Logger {

def fromYamlText(text: String): Map[String, String] = {
try {
val map = MutableMap[String, String]()
new Yaml()
.load(text)
.asInstanceOf[java.util.Map[String, Map[String, Any]]]
.flatMap(x => eachAppendYamlItem("", x._1, x._2, map))
.flatMap(x => eachYamlItem(x._1, x._2))
.toMap
} catch {
case e: IOException =>
Expand Down Expand Up @@ -148,11 +147,10 @@ object PropertiesUtils extends Logger {
inputStream != null,
s"[StreamPark] fromYamlFile: Properties inputStream must not be null")
try {
val map = MutableMap[String, String]()
new Yaml()
.load(inputStream)
.asInstanceOf[java.util.Map[String, Map[String, Any]]]
.flatMap(x => eachAppendYamlItem("", x._1, x._2, map))
.flatMap(x => eachYamlItem(x._1, x._2))
.toMap
} catch {
case e: IOException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ set names utf8mb4;
set foreign_key_checks = 0;

alter table `t_flink_app`
add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null;

add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
-- modify_time change with duration #3188
modify column `modify_time` datetime not null default current_timestamp comment 'modify time';
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@
-- Table of t_resource
-- ----------------------------
create sequence "public"."streampark_t_resource_id_seq"
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;

create table "public"."t_resource" (
"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
"resource_name" varchar(128) collate "pg_catalog"."default" not null,
"resource_type" int4,
"resource_path" varchar(255) default null,
"resource" text collate "pg_catalog"."default",
"engine_type" int4,
"main_class" varchar(255) collate "pg_catalog"."default",
"description" text collate "pg_catalog"."default" default null,
"creator_id" int8 not null,
"connector_required_options" text default null,
"connector_optional_options" text default null,
"team_id" int8 not null,
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
)
;
"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
"resource_name" varchar(128) collate "pg_catalog"."default" not null,
"resource_type" int4,
"resource_path" varchar(255) default null,
"resource" text collate "pg_catalog"."default",
"engine_type" int4,
"main_class" varchar(255) collate "pg_catalog"."default",
"description" text collate "pg_catalog"."default" default null,
"creator_id" int8 not null,
"connector_required_options" text default null,
"connector_optional_options" text default null,
"team_id" int8 not null,
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
);

comment on column "public"."t_resource"."id" is 'Resource id';
comment on column "public"."t_resource"."resource_name" is 'Resource name';
comment on column "public"."t_resource"."resource_type" is '0:app 1:common 2:connector 3:format 4:udf';
Expand All @@ -52,21 +52,21 @@ comment on column "public"."t_resource"."modify_time" is 'modify time';

alter table "public"."t_resource" add constraint "t_resource_pkey" primary key ("id");
create index "un_team_dname_inx" on "public"."t_resource" using btree (
"team_id" "pg_catalog"."int8_ops" asc nulls last,
"resource_name" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc nulls last
);
"team_id" "pg_catalog"."int8_ops" asc nulls last,
"resource_name" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc nulls last
);

alter table "public"."t_flink_sql"
add column "team_resource" varchar(64) default null;
add column "team_resource" varchar(64) default null;

alter table "public"."t_flink_app"
add column "probing" boolean default false;
add column "probing" boolean default false;

alter table "public"."t_flink_cluster"
add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
add column "start_time" timestamp(6) collate "pg_catalog"."default",
add column "end_time" timestamp(6) collate "pg_catalog"."default",
add column "alert_id" int8 collate "pg_catalog"."default";
add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
add column "start_time" timestamp(6) collate "pg_catalog"."default",
add column "end_time" timestamp(6) collate "pg_catalog"."default",
add column "alert_id" int8 collate "pg_catalog"."default";

insert into "public"."t_menu" values (120400, 120000, 'menu.resource', '/flink/resource', 'flink/resource/View', null, 'apartment', '0', '1', 3, now(), now());
insert into "public"."t_menu" values (110401, 110400, 'add', null, null, 'token:add', null, '1', '1', null, now(), now());
Expand All @@ -91,16 +91,16 @@ comment on column "public"."t_user"."login_type" is 'login type 0:password 1:lda
-- Table of t_flink_gateway
-- ----------------------------
create sequence "public"."streampark_t_flink_gateway_id_seq"
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;

create table "public"."t_flink_gateway" (
"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
"gateway_name" varchar(128) collate "pg_catalog"."default" not null,
"description" text collate "pg_catalog"."default" default null,
"gateway_type" int4,
"address" varchar(150) collate "pg_catalog"."default",
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
"gateway_name" varchar(128) collate "pg_catalog"."default" not null,
"description" text collate "pg_catalog"."default" default null,
"gateway_type" int4,
"address" varchar(150) collate "pg_catalog"."default",
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
);
comment on column "public"."t_flink_gateway"."id" is 'The id of the gateway';
comment on column "public"."t_flink_gateway"."gateway_name" is 'The name of the gateway';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ create table if not exists `t_flink_app` (
`option_state` tinyint default null,
`tracking` tinyint default null,
`create_time` datetime not null default current_timestamp comment 'create time',
`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time',
`modify_time` datetime not null default current_timestamp comment 'modify time',
`option_time` datetime default null,
`release` tinyint default 1,
`build` tinyint default 1,
Expand Down