-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfilequeue.h
150 lines (133 loc) · 5.33 KB
/
filequeue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#ifndef FILEQUEUE_H
#define FILEQUEUE_H
#include <QObject>
#include <QThread>
#include <QQueue>
#include <QMutex>
#include <QWaitCondition>
#include <QNetworkAccessManager>
#include <QNetworkReply>
#include "cosclient.h"
struct RequestInfo{
QString key;
QString localPath;
QString versionId;
QString RemotePath;
QString copyto;
int methodId;
};
class NetworkRequestManager : public QObject {
Q_OBJECT
public:
NetworkRequestManager(const COSConfig &config, QObject *parent = nullptr)
: QObject(parent), cosConfig(config), maxConcurrentRequests(3) {}
void addPutObjectRequest(const QString &localPath, const QString &key, int fileTaskId, QMap<QString, QString> metaData) {
QMutexLocker locker(&mutex);
RequestInfo requestInfo;
requestInfo.key = key;
requestInfo.localPath = localPath;
requestInfo.methodId = 0;
requestQueue.enqueue({requestInfo , fileTaskId});
if (activeRequests < maxConcurrentRequests) {
startNextRequest();
}
}
void addSave2LocalRequest(const QString &key, const QString &localPath, int fileTaskId, QString versionId="") {
QMutexLocker locker(&mutex);
RequestInfo requestInfo;
requestInfo.key = key;
requestInfo.localPath = localPath;
requestInfo.versionId = versionId;
requestInfo.methodId = 1;
requestQueue.enqueue({requestInfo, fileTaskId});
if (activeRequests < maxConcurrentRequests) {
startNextRequest();
}
}
void addPutObjectCopyRequest(const QString ©to, const QString &RemotePath, int fileTaskId) {
QMutexLocker locker(&mutex);
RequestInfo requestInfo;
requestInfo.copyto = copyto;
requestInfo.RemotePath = RemotePath;
requestInfo.methodId = 2;
requestQueue.enqueue({requestInfo, fileTaskId});
if (activeRequests < maxConcurrentRequests) {
startNextRequest();
}
}
void addDeleteObjectRequest(const QString &RemotePath, const QString &versionId, int fileTaskId) {
QMutexLocker locker(&mutex);
RequestInfo requestInfo;
requestInfo.RemotePath = RemotePath;
requestInfo.versionId = versionId;
requestInfo.methodId = 3;
requestQueue.enqueue({requestInfo, fileTaskId});
if (activeRequests < maxConcurrentRequests) {
startNextRequest();
}
}
signals:
void requestProgress(int fileTaskId, qint64 bytesReceived, qint64 bytesTotal);
void requestFinished(int fileTaskId, QNetworkReply::NetworkError error);
void sendRequestInfo(RequestInfo requestInfo);
public slots:
void setMaxConcurrentRequests(int max) {
qDebug() << "setMaxConcurrentRequests: " << max;
QMutexLocker locker(&mutex);
maxConcurrentRequests.store(max);
}
private slots:
void onRequestFinished(int fileTaskId, QNetworkReply::NetworkError error,RequestInfo requestInfo) {
emit requestFinished(fileTaskId, error);
emit sendRequestInfo(requestInfo);
QMutexLocker locker(&mutex);
activeRequests--;
startNextRequest();
}
void onRequestProgress(int fileTaskId, qint64 bytesReceived, qint64 bytesTotal) {
qDebug() << "progress: " << bytesReceived << " / " << bytesTotal;
emit requestProgress(fileTaskId, bytesReceived, bytesTotal);
}
private:
void startNextRequest() {
if (requestQueue.isEmpty()) {
return;
}
auto requestPair = requestQueue.dequeue();
auto requestInfo = requestPair.first;
int fileTaskId = requestPair.second;
QThread *thread = QThread::create([=] {
COSClient cosClient(cosConfig);
connect(&cosClient, &COSClient::progress, this, [=](qint64 bytesReceived, qint64 bytesTotal) {
onRequestProgress(fileTaskId, bytesReceived, bytesTotal);
},Qt::BlockingQueuedConnection);
connect(&cosClient, &COSClient::finished, this, [=](QNetworkReply::NetworkError error){
onRequestFinished(fileTaskId, error,requestInfo);
},Qt::BlockingQueuedConnection);
if (requestInfo.methodId == 0) {
cosClient.blockingPutFinishSignal=true;
cosClient.multiUpload(requestInfo.key, requestInfo.localPath, QMap<QString, QString>());
} else if (requestInfo.methodId == 1) {
if(!requestInfo.versionId.isEmpty()){
QMap<QString, QString> metaData;
cosClient.save2Local(requestInfo.key, requestInfo.localPath,requestInfo.versionId,metaData);
}else{
cosClient.save2LocalWithoutVersion(requestInfo.key, requestInfo.localPath);}
}else if (requestInfo.methodId == 2) {
cosClient.putObjectCopy(requestInfo.copyto,requestInfo.RemotePath);
}else if (requestInfo.methodId == 3) {
cosClient.deleteObject(requestInfo.RemotePath,requestInfo.versionId);
}
//requestFunc();
});
connect(thread, &QThread::finished, thread, &QThread::deleteLater);
thread->start();
activeRequests++;
}
COSConfig cosConfig;
QQueue<QPair<RequestInfo, int>> requestQueue;
QMutex mutex;
int activeRequests = 0;
std::atomic<int> maxConcurrentRequests;
};
#endif // FILEQUEUE_H