Skip to content

Commit

Permalink
feat: add a view for failed replication items
Browse files Browse the repository at this point in the history
  • Loading branch information
fuziontech committed Jan 9, 2025
1 parent c93fa9e commit 6a80f32
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 120 deletions.
235 changes: 141 additions & 94 deletions frontend/src/pages/Replication/Replication.tsx
Original file line number Diff line number Diff line change
@@ -1,126 +1,173 @@
import { Table, Button, notification, Typography, Tooltip, Spin } from 'antd'
import { Table, Button, notification, Typography, Tooltip, Spin, Select, Space } from 'antd'
import { usePollingEffect } from '../../utils/usePollingEffect'
import React, { useState } from 'react'
import React, { useState, useEffect } from 'react'
import { ColumnType } from 'antd/es/table'

const { Paragraph } = Typography

interface RunningQueryData {
query: string
read_rows: number
read_rows_readable: string
query_id: string
total_rows_approx: number
total_rows_approx_readable: string
elapsed: number
memory_usage: string
interface ClusterNode {
cluster: string
shard_num: number
shard_weight: number
replica_num: number
host_name: string
host_address: string
port: number
is_local: number
user: string
default_database: string
errors_count: number
slowdowns_count: number
estimated_recovery_time: number
}

function KillQueryButton({ queryId }: any) {
const [isLoading, setIsLoading] = useState(false)
const [isKilled, setIsKilled] = useState(false)
interface Cluster {
cluster: string
nodes: ClusterNode[]
}

const killQuery = async () => {
setIsLoading(true)
try {
const res = await fetch(`/api/analyze/${queryId}/kill_query`, {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
body: new URLSearchParams({
query_id: queryId,
}),
})
setIsKilled(true)
setIsLoading(false)
return await res.json()
} catch (err) {
setIsLoading(false)
notification.error({
message: 'Killing query failed',
})
}
}
return (
<>
{isKilled ? (
<Button disabled>Query killed</Button>
) : (
<Button danger onClick={killQuery} loading={isLoading}>
Kill query
</Button>
)}
</>
)
interface ReplicationQueueItem {
host_name: string
database: string
table: string
position: number
error: string
last_attempt_time: string
num_attempts: number
type: string
}

export default function Replication() {
const [runningQueries, setRunningQueries] = useState([])
const [loadingRunningQueries, setLoadingRunningQueries] = useState(false)
const [replicationQueue, setReplicationQueue] = useState<ReplicationQueueItem[]>([])
const [loadingReplication, setLoadingReplication] = useState(false)
const [selectedCluster, setSelectedCluster] = useState<string>('')
const [clusters, setClusters] = useState<Cluster[]>([])
const [loadingClusters, setLoadingClusters] = useState(false)

useEffect(() => {
const fetchClusters = async () => {
setLoadingClusters(true)
try {
const res = await fetch('/api/clusters')
const resJson: Cluster[] = await res.json()
setClusters(resJson)
if (resJson.length > 0) {
setSelectedCluster(resJson[0].cluster)
}
} catch (err) {
notification.error({
message: 'Failed to fetch clusters',
description: 'Please try again later',
})
}
setLoadingClusters(false)
}
fetchClusters()
}, [])

const columns: ColumnType<RunningQueryData>[] = [
const columns: ColumnType<ReplicationQueueItem>[] = [
{
title: 'Host',
dataIndex: 'host_name',
key: 'host_name',
},
{
title: 'Query',
dataIndex: 'normalized_query',
key: 'query',
render: (_: any, item) => {
let index = 0
return (
<Paragraph
style={{ maxWidth: '100%', fontFamily: 'monospace' }}
ellipsis={{
rows: 2,
expandable: true,
}}
>
{item.query.replace(/(\?)/g, () => {
index = index + 1
return '$' + index
})}
</Paragraph>
)
},
title: 'Database',
dataIndex: 'database',
key: 'database',
},
{ title: 'User', dataIndex: 'user' },
{ title: 'Elapsed time', dataIndex: 'elapsed' },
{
title: 'Rows read',
dataIndex: 'read_rows',
render: (_: any, item) => (
<Tooltip title={`~${item.read_rows}/${item.total_rows_approx}`}>
~{item.read_rows_readable}/{item.total_rows_approx_readable}
</Tooltip>
title: 'Table',
dataIndex: 'table',
key: 'table',
},
{
title: 'Error',
dataIndex: 'error',
key: 'error',
render: (error: string) => (
<Paragraph
style={{ maxWidth: '400px', color: 'red' }}
ellipsis={{
rows: 2,
expandable: true,
}}
>
{error}
</Paragraph>
),
},
{ title: 'Memory Usage', dataIndex: 'memory_usage' },
{
title: 'Actions',
render: (_: any, item) => <KillQueryButton queryId={item.query_id} />,
title: 'Last Attempt',
dataIndex: 'last_attempt_time',
key: 'last_attempt_time',
},
{
title: 'Attempts',
dataIndex: 'num_attempts',
key: 'num_attempts',
},
{
title: 'Type',
dataIndex: 'type',
key: 'type',
},
]

usePollingEffect(
async () => {
setLoadingRunningQueries(true)
const res = await fetch('/api/analyze/running_queries')
const resJson = await res.json()
setRunningQueries(resJson)
setLoadingRunningQueries(false)
if (!selectedCluster) return

setLoadingReplication(true)
try {
const res = await fetch(`/api/replication/?cluster=${selectedCluster}`)
const resJson = await res.json()
// Filter for failed items only
const failedItems = resJson.filter((item: ReplicationQueueItem) => item.error)
setReplicationQueue(failedItems)
} catch (err) {
notification.error({
message: 'Failed to fetch replication queue',
description: 'Please try again later',
})
}
setLoadingReplication(false)
},
[],
[selectedCluster],
{ interval: 5000 }
)

return (
<>
<h1 style={{ textAlign: 'left' }}>Running queries {loadingRunningQueries ? <Spin /> : null}</h1>
<br />
<Table
columns={columns}
dataSource={runningQueries}
loading={runningQueries.length == 0 && loadingRunningQueries}
/>
<Space direction="vertical" size="large" style={{ width: '100%' }}>
<Space>
<h1 style={{ margin: 0 }}>
{`${replicationQueue.length}`} Failed Replication Queue Items
</h1>
{loadingReplication && <Spin />}
</Space>

<Select
style={{ width: 200 }}
value={selectedCluster}
onChange={setSelectedCluster}
loading={loadingClusters}
placeholder="Select a cluster"
>
{clusters.map((cluster) => (
<Select.Option key={cluster.cluster} value={cluster.cluster}>
{cluster.cluster}
</Select.Option>
))}
</Select>

<Table
columns={columns}
dataSource={replicationQueue}
loading={replicationQueue.length === 0 && loadingReplication}
rowKey={(record) => `${record.host_name}-${record.table}-${record.position}`}
/>
</Space>
</>
)
}
9 changes: 8 additions & 1 deletion housewatch/api/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import structlog
from rest_framework.decorators import action
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
Expand All @@ -15,3 +14,11 @@ def list(self, request: Request) -> Response:

def retrieve(self, request: Request, pk: str) -> Response:
return Response(clusters.get_cluster(pk))


class ReplicationViewset(GenericViewSet):
def list(self, request: Request) -> Response:
cluster = request.query_params.get("cluster")
if not cluster:
return Response({"error": "cluster parameter is required"}, status=400)
return Response(list(clusters.get_replication_queues(cluster)))
60 changes: 36 additions & 24 deletions housewatch/clickhouse/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from typing import Dict, Optional
from clickhouse_pool import ChPool
from clickhouse_driver import Client
Expand All @@ -23,6 +22,24 @@
)


def get_client(node: Optional[Dict] = None):
if node:
client = Client(
host=node["host_name"],
database=settings.CLICKHOUSE_DATABASE,
user=settings.CLICKHOUSE_USER,
secure=settings.CLICKHOUSE_SECURE,
ca_certs=settings.CLICKHOUSE_CA,
verify=settings.CLICKHOUSE_VERIFY,
settings={"max_result_rows": "2000"},
send_receive_timeout=30,
password=settings.CLICKHOUSE_PASSWORD,
)
else:
client = pool.get_client()
return client


def run_query_on_shards(
query: str,
params: Dict[str, str | int] = {},
Expand All @@ -38,24 +55,13 @@ def run_query_on_shards(
for shard, node in nodes:
params["shard"] = shard
final_query = query % (params or {}) if substitute_params else query
client = Client(
host=node["host_address"],
database=settings.CLICKHOUSE_DATABASE,
user=settings.CLICKHOUSE_USER,
secure=settings.CLICKHOUSE_SECURE,
ca_certs=settings.CLICKHOUSE_CA,
verify=settings.CLICKHOUSE_VERIFY,
settings={"max_result_rows": "2000"},
send_receive_timeout=30,
password=settings.CLICKHOUSE_PASSWORD,
)
client = get_client(node)
result = client.execute(final_query, settings=query_settings, with_column_types=True, query_id=query_id)
response = []
for res in result[0]:
item = {}
for index, key in enumerate(result[1]):
item[key[0]] = res[index]

response.append(item)
responses.append((shard, response))
return response
Expand All @@ -68,7 +74,7 @@ def run_query(
query_id: Optional[str] = None,
use_cache: bool = True, # defaulting to True for now for simplicity, but ideally we should default this to False
substitute_params: bool = True,
cluster: Optional[str] = None,
node: Optional[Dict] = None,
):
final_query = query % (params or {}) if substitute_params else query
query_hash = ""
Expand All @@ -79,18 +85,24 @@ def run_query(
if cached_result:
return json.loads(cached_result)

with pool.get_client() as client:
response = []
if node:
client = get_client(node)
result = client.execute(final_query, settings=settings, with_column_types=True, query_id=query_id)
response = []
for res in result[0]:
item = {}
for index, key in enumerate(result[1]):
item[key[0]] = res[index]
else:
with pool.get_client() as client:
result = client.execute(final_query, settings=settings, with_column_types=True, query_id=query_id)

response.append(item)
if use_cache:
cache.set(query_hash, json.dumps(response, default=str), timeout=60 * 5)
return response
for res in result[0]:
item = {}
for index, key in enumerate(result[1]):
item[key[0]] = res[index]
response.append(item)

if use_cache:
cache.set(query_hash, json.dumps(response, default=str), timeout=60 * 5)

return response


existing_system_tables = [row["name"] for row in run_query(EXISTING_TABLES_SQL, use_cache=False)]
Loading

0 comments on commit 6a80f32

Please sign in to comment.