33ParallelProcessRunner::ParallelProcessRunner(QObject *parent)
34 : QObject(parent), parallelCount_(0), completedProcessCount_(0),
35 failedProcessCount_(0) {}
37ParallelProcessRunner::~ParallelProcessRunner() {
40 for (
auto* process : processes_) {
43 pendingStdout_.clear();
44 pendingStderr_.clear();
48 parallelCount_ = count;
52 for (
const auto* process : processes_) {
53 if (process && process->state() != QProcess::NotRunning) {
61 for (
auto* process : processes_) {
62 if (process && process->state() != QProcess::NotRunning) {
64 process->waitForFinished(1000);
70 const QStringList &inputData,
int chunkCount)
const {
72 std::vector<QStringList> chunks(chunkCount);
75 for (
int i = 0; i < inputData.size(); ++i) {
76 chunks[i % chunkCount].append(inputData[i]);
83 const QString &programPath,
84 const QStringList &inputData,
85 const QString &finalOutputPath) {
88 emit
error(
"Processes are already running");
92 if (inputData.isEmpty()) {
93 emit
error(
"No input data to process");
97 programPath_ = programPath;
101 int chunkCount = parallelCount_;
102 if (chunkCount <= 0) {
103 chunkCount = QThread::idealThreadCount();
104 if (chunkCount <= 0) {
110 chunkCount = std::min(chunkCount,
static_cast<int>(inputData.size()));
112 emit
progress(QString(
"Dividing %1 items into %2 chunks for parallel processing")
113 .arg(inputData.size()).arg(chunkCount));
120 emit
error(
"Failed to create temporary directory for chunk outputs");
125 for (
auto* process : processes_) {
129 tempOutputPaths_.clear();
131 completedProcessCount_ = 0;
132 failedProcessCount_ = 0;
133 pendingStdout_.clear();
134 pendingStderr_.clear();
138 for (
int i = 0; i < static_cast<int>(chunks.size()); ++i) {
139 const QStringList& chunk = chunks[i];
141 tempOutputPaths_.append(tempOutputPath);
147 QProcess* process =
new QProcess(
this);
148 process->setProgram(programPath_);
149 process->setArguments(arguments);
150 process->setProperty(
"chunkIndex", i);
152 connect(process, &QProcess::finished,
this,
153 &ParallelProcessRunner::onProcessFinished);
154 connect(process, &QProcess::errorOccurred,
this,
155 &ParallelProcessRunner::onProcessError);
156 connect(process, &QProcess::readyReadStandardOutput,
this,
157 &ParallelProcessRunner::onProcessStdOut);
158 connect(process, &QProcess::readyReadStandardError,
this,
159 &ParallelProcessRunner::onProcessStdErr);
161 processes_.push_back(process);
163 qDebug() <<
"Starting chunk" << i <<
"with" << chunk.size() <<
"items";
167 emit
progress(QString(
"Started %1 parallel processes").arg(chunks.size()));
171 return tempDir_.filePath(QString(
"output_%1.tmp").arg(chunkIndex));
174void ParallelProcessRunner::onProcessFinished(
175 int exitCode, QProcess::ExitStatus exitStatus) {
177 QProcess* process = qobject_cast<QProcess*>(sender());
178 if (!process)
return;
180 QString stderr_output;
181 drainProcessOutput(process, pendingStdout_, pendingStderr_,
182 processSource(process),
183 [
this](
const LogEntry &entry) { emit logMessage(entry); },
184 nullptr, &stderr_output);
186 int chunkIndex = process->property(
"chunkIndex").toInt();
187 int totalChunks =
static_cast<int>(processes_.size());
189 if (exitStatus != QProcess::NormalExit || exitCode != 0) {
190 failedProcessCount_++;
191 QString errorMsg = QString(
"Chunk %1 failed with exit code %2:\n%3")
192 .arg(chunkIndex).arg(exitCode).arg(stderr_output);
193 errorMessages_.append(errorMsg);
194 qDebug() << errorMsg;
195 emit chunkFailed(chunkIndex, errorMsg);
197 completedProcessCount_++;
198 qDebug() <<
"Chunk" << chunkIndex <<
"completed successfully";
199 emit chunkCompleted(chunkIndex, totalChunks);
202 checkAllProcessesCompleted();
204 pendingStdout_.remove(process);
205 pendingStderr_.remove(process);
208void ParallelProcessRunner::onProcessError(QProcess::ProcessError processError) {
209 QProcess* process = qobject_cast<QProcess*>(sender());
210 if (!process)
return;
212 int chunkIndex = process->property(
"chunkIndex").toInt();
213 failedProcessCount_++;
215 drainProcessOutput(process, pendingStdout_, pendingStderr_,
216 processSource(process),
217 [
this](
const LogEntry &entry) { emit logMessage(entry); });
219 QString errorMessage;
220 QString systemError = process->errorString();
221 switch (processError) {
222 case QProcess::FailedToStart:
223 errorMessage = QString(
"Chunk %1: Failed to start process: %2")
224 .arg(chunkIndex).arg(systemError);
226 case QProcess::Crashed:
227 errorMessage = QString(
"Chunk %1: Process crashed: %2")
228 .arg(chunkIndex).arg(systemError);
231 errorMessage = QString(
"Chunk %1: Process error %2: %3")
232 .arg(chunkIndex).arg(processError).arg(systemError);
236 errorMessages_.append(errorMessage);
237 qDebug() << errorMessage;
238 emit chunkFailed(chunkIndex, errorMessage);
240 emitErrorLog(processSource(process), errorMessage,
241 [
this](
const LogEntry &entry) { emit logMessage(entry); });
243 checkAllProcessesCompleted();
246void ParallelProcessRunner::onProcessStdOut() {
247 QProcess* process = qobject_cast<QProcess*>(sender());
248 if (!process)
return;
249 emitParsedOutput(pendingStdout_, process,
250 QString::fromUtf8(process->readAllStandardOutput()),
251 processSource(process),
false,
252 [
this](
const LogEntry &entry) { emit logMessage(entry); });
255void ParallelProcessRunner::onProcessStdErr() {
256 QProcess* process = qobject_cast<QProcess*>(sender());
257 if (!process)
return;
258 emitParsedOutput(pendingStderr_, process,
259 QString::fromUtf8(process->readAllStandardError()),
260 processSource(process),
true,
261 [
this](
const LogEntry &entry) { emit logMessage(entry); });
264QString ParallelProcessRunner::processSource(QProcess *process)
const {
265 const int chunkIndex = process ? process->property(
"chunkIndex").toInt() : 0;
266 QString base = QFileInfo(programPath_).baseName();
267 if (base.isEmpty()) {
270 if (processes_.size() > 1) {
271 return QString(
"%1[%2]").arg(base).arg(chunkIndex);
276void ParallelProcessRunner::checkAllProcessesCompleted() {
277 int totalProcesses =
static_cast<int>(processes_.size());
278 int finishedProcesses = completedProcessCount_ + failedProcessCount_;
280 if (finishedProcesses < totalProcesses) {
281 emit progress(QString(
"Completed %1/%2 chunks")
282 .arg(finishedProcesses).arg(totalProcesses));
287 onAllCompleted(completedProcessCount_, failedProcessCount_, totalProcesses);
291 int successCount,
int failureCount,
int totalCount) {
293 if (failureCount == totalCount) {
294 emit
error(QString(
"All %1 chunks failed").arg(totalCount));
298 if (successCount == 0) {
299 emit
error(
"No chunks completed successfully");
303 emit
progress(
"All chunks completed, merging results...");
310 emit
error(QString(
"Failed to merge chunk outputs: %1").arg(mergeError));
314 const double seconds = elapsed_.isValid()
315 ?
static_cast<double>(elapsed_.elapsed()) / 1000.0
318 QString(
"Results merged successfully in %1s")
319 .arg(QString::number(seconds,
'f', 2)));
Generic parallel process runner for executing multiple instances of a program.
Helpers for draining process output into log entries.
virtual std::vector< QStringList > chunkInputData(const QStringList &inputData, int chunkCount) const
Divide input data into chunks for parallel processing.
void error(const QString &errorMessage)
Emitted when an error occurs that prevents execution.
QString getTempOutputPath(int chunkIndex) const
Get path to temporary output file for a chunk.
void progress(const QString &message)
Emitted with progress updates (e.g., "Completed 3/8 chunks").
virtual bool mergeResults(const QStringList &tempOutputPaths, const QString &finalOutputPath, QString &errorMessage)=0
Merge outputs from all chunks into final result.
QString finalOutputPath_
Final output file path.
void setParallelCount(int count)
Set number of parallel processes (0 = auto-detect from CPU cores).
void cancel()
Cancel all running processes.
virtual QStringList prepareProcessArguments(int chunkIndex, const QStringList &chunkData, const QString &tempOutputPath)=0
Prepare command-line arguments for a specific chunk.
virtual void onAllCompleted(int successCount, int failureCount, int totalCount)
Called when all processes have completed (success or failure).
bool isRunning() const
Check if any process is currently running.
QStringList errorMessages_
Aggregated error messages.
void runParallel(const QString &programPath, const QStringList &inputData, const QString &finalOutputPath)
Start parallel execution.
QTemporaryDir tempDir_
Temporary directory for chunk outputs.