ACAV f0ba4b7c9529
Abstract Syntax Tree (AST) visualization tool for C, C++, and Objective-C
Loading...
Searching...
No Matches
ParallelProcessRunner.cpp
Go to the documentation of this file.
1/*$!{
2* Aurora Clang AST Viewer (ACAV)
3*
4* Copyright (c) 2026 Min Liu
5* Copyright (c) 2026 Michael David Adams
6*
7* SPDX-License-Identifier: GPL-2.0-or-later
8*
9* This program is free software; you can redistribute it and/or modify
10* it under the terms of the GNU General Public License as published by
11* the Free Software Foundation; either version 2 of the License, or
12* (at your option) any later version.
13*
14* This program is distributed in the hope that it will be useful,
15* but WITHOUT ANY WARRANTY; without even the implied warranty of
16* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17* GNU General Public License for more details.
18*
19* You should have received a copy of the GNU General Public License along
20* with this program; if not, see <https://www.gnu.org/licenses/>.
21}$!*/
22
27#include <QDebug>
28#include <QFileInfo>
29#include <QThread>
30
31namespace acav {
32
33ParallelProcessRunner::ParallelProcessRunner(QObject *parent)
34 : QObject(parent), parallelCount_(0), completedProcessCount_(0),
35 failedProcessCount_(0) {}
36
37ParallelProcessRunner::~ParallelProcessRunner() {
38 cancel();
39 // Clean up process objects
40 for (auto* process : processes_) {
41 delete process;
42 }
43 pendingStdout_.clear();
44 pendingStderr_.clear();
45}
46
48 parallelCount_ = count;
49}
50
52 for (const auto* process : processes_) {
53 if (process && process->state() != QProcess::NotRunning) {
54 return true;
55 }
56 }
57 return false;
58}
59
61 for (auto* process : processes_) {
62 if (process && process->state() != QProcess::NotRunning) {
63 process->kill();
64 process->waitForFinished(1000);
65 }
66 }
67}
68
70 const QStringList &inputData, int chunkCount) const {
71
72 std::vector<QStringList> chunks(chunkCount);
73
74 // Round-robin distribution for even load balancing
75 for (int i = 0; i < inputData.size(); ++i) {
76 chunks[i % chunkCount].append(inputData[i]);
77 }
78
79 return chunks;
80}
81
83 const QString &programPath,
84 const QStringList &inputData,
85 const QString &finalOutputPath) {
86
87 if (isRunning()) {
88 emit error("Processes are already running");
89 return;
90 }
91
92 if (inputData.isEmpty()) {
93 emit error("No input data to process");
94 return;
95 }
96
97 programPath_ = programPath;
98 finalOutputPath_ = finalOutputPath;
99
100 // Determine chunk count
101 int chunkCount = parallelCount_;
102 if (chunkCount <= 0) {
103 chunkCount = QThread::idealThreadCount();
104 if (chunkCount <= 0) {
105 chunkCount = 4; // Fallback default
106 }
107 }
108
109 // Don't create more chunks than input items
110 chunkCount = std::min(chunkCount, static_cast<int>(inputData.size()));
111
112 emit progress(QString("Dividing %1 items into %2 chunks for parallel processing")
113 .arg(inputData.size()).arg(chunkCount));
114
115 // Divide input data into chunks
116 auto chunks = chunkInputData(inputData, chunkCount);
117
118 // Create temporary directory
119 if (!tempDir_.isValid()) {
120 emit error("Failed to create temporary directory for chunk outputs");
121 return;
122 }
123
124 // Clean up previous run
125 for (auto* process : processes_) {
126 delete process;
127 }
128 processes_.clear();
129 tempOutputPaths_.clear();
130 errorMessages_.clear();
131 completedProcessCount_ = 0;
132 failedProcessCount_ = 0;
133 pendingStdout_.clear();
134 pendingStderr_.clear();
135 elapsed_.restart();
136
137 // Launch processes for each chunk
138 for (int i = 0; i < static_cast<int>(chunks.size()); ++i) {
139 const QStringList& chunk = chunks[i];
140 QString tempOutputPath = getTempOutputPath(i);
141 tempOutputPaths_.append(tempOutputPath);
142
143 // Let subclass prepare arguments
144 QStringList arguments = prepareProcessArguments(i, chunk, tempOutputPath);
145
146 // Create and configure process
147 QProcess* process = new QProcess(this);
148 process->setProgram(programPath_);
149 process->setArguments(arguments);
150 process->setProperty("chunkIndex", i);
151
152 connect(process, &QProcess::finished, this,
153 &ParallelProcessRunner::onProcessFinished);
154 connect(process, &QProcess::errorOccurred, this,
155 &ParallelProcessRunner::onProcessError);
156 connect(process, &QProcess::readyReadStandardOutput, this,
157 &ParallelProcessRunner::onProcessStdOut);
158 connect(process, &QProcess::readyReadStandardError, this,
159 &ParallelProcessRunner::onProcessStdErr);
160
161 processes_.push_back(process);
162
163 qDebug() << "Starting chunk" << i << "with" << chunk.size() << "items";
164 process->start();
165 }
166
167 emit progress(QString("Started %1 parallel processes").arg(chunks.size()));
168}
169
170QString ParallelProcessRunner::getTempOutputPath(int chunkIndex) const {
171 return tempDir_.filePath(QString("output_%1.tmp").arg(chunkIndex));
172}
173
174void ParallelProcessRunner::onProcessFinished(
175 int exitCode, QProcess::ExitStatus exitStatus) {
176
177 QProcess* process = qobject_cast<QProcess*>(sender());
178 if (!process) return;
179
180 QString stderr_output;
181 drainProcessOutput(process, pendingStdout_, pendingStderr_,
182 processSource(process),
183 [this](const LogEntry &entry) { emit logMessage(entry); },
184 nullptr, &stderr_output);
185
186 int chunkIndex = process->property("chunkIndex").toInt();
187 int totalChunks = static_cast<int>(processes_.size());
188
189 if (exitStatus != QProcess::NormalExit || exitCode != 0) {
190 failedProcessCount_++;
191 QString errorMsg = QString("Chunk %1 failed with exit code %2:\n%3")
192 .arg(chunkIndex).arg(exitCode).arg(stderr_output);
193 errorMessages_.append(errorMsg);
194 qDebug() << errorMsg;
195 emit chunkFailed(chunkIndex, errorMsg);
196 } else {
197 completedProcessCount_++;
198 qDebug() << "Chunk" << chunkIndex << "completed successfully";
199 emit chunkCompleted(chunkIndex, totalChunks);
200 }
201
202 checkAllProcessesCompleted();
203
204 pendingStdout_.remove(process);
205 pendingStderr_.remove(process);
206}
207
208void ParallelProcessRunner::onProcessError(QProcess::ProcessError processError) {
209 QProcess* process = qobject_cast<QProcess*>(sender());
210 if (!process) return;
211
212 int chunkIndex = process->property("chunkIndex").toInt();
213 failedProcessCount_++;
214
215 drainProcessOutput(process, pendingStdout_, pendingStderr_,
216 processSource(process),
217 [this](const LogEntry &entry) { emit logMessage(entry); });
218
219 QString errorMessage;
220 QString systemError = process->errorString();
221 switch (processError) {
222 case QProcess::FailedToStart:
223 errorMessage = QString("Chunk %1: Failed to start process: %2")
224 .arg(chunkIndex).arg(systemError);
225 break;
226 case QProcess::Crashed:
227 errorMessage = QString("Chunk %1: Process crashed: %2")
228 .arg(chunkIndex).arg(systemError);
229 break;
230 default:
231 errorMessage = QString("Chunk %1: Process error %2: %3")
232 .arg(chunkIndex).arg(processError).arg(systemError);
233 break;
234 }
235
236 errorMessages_.append(errorMessage);
237 qDebug() << errorMessage;
238 emit chunkFailed(chunkIndex, errorMessage);
239
240 emitErrorLog(processSource(process), errorMessage,
241 [this](const LogEntry &entry) { emit logMessage(entry); });
242
243 checkAllProcessesCompleted();
244}
245
246void ParallelProcessRunner::onProcessStdOut() {
247 QProcess* process = qobject_cast<QProcess*>(sender());
248 if (!process) return;
249 emitParsedOutput(pendingStdout_, process,
250 QString::fromUtf8(process->readAllStandardOutput()),
251 processSource(process), false,
252 [this](const LogEntry &entry) { emit logMessage(entry); });
253}
254
255void ParallelProcessRunner::onProcessStdErr() {
256 QProcess* process = qobject_cast<QProcess*>(sender());
257 if (!process) return;
258 emitParsedOutput(pendingStderr_, process,
259 QString::fromUtf8(process->readAllStandardError()),
260 processSource(process), true,
261 [this](const LogEntry &entry) { emit logMessage(entry); });
262}
263
264QString ParallelProcessRunner::processSource(QProcess *process) const {
265 const int chunkIndex = process ? process->property("chunkIndex").toInt() : 0;
266 QString base = QFileInfo(programPath_).baseName();
267 if (base.isEmpty()) {
268 base = "process";
269 }
270 if (processes_.size() > 1) {
271 return QString("%1[%2]").arg(base).arg(chunkIndex);
272 }
273 return base;
274}
275
276void ParallelProcessRunner::checkAllProcessesCompleted() {
277 int totalProcesses = static_cast<int>(processes_.size());
278 int finishedProcesses = completedProcessCount_ + failedProcessCount_;
279
280 if (finishedProcesses < totalProcesses) {
281 emit progress(QString("Completed %1/%2 chunks")
282 .arg(finishedProcesses).arg(totalProcesses));
283 return;
284 }
285
286 // All processes finished
287 onAllCompleted(completedProcessCount_, failedProcessCount_, totalProcesses);
288}
289
291 int successCount, int failureCount, int totalCount) {
292
293 if (failureCount == totalCount) {
294 emit error(QString("All %1 chunks failed").arg(totalCount));
295 return;
296 }
297
298 if (successCount == 0) {
299 emit error("No chunks completed successfully");
300 return;
301 }
302
303 emit progress("All chunks completed, merging results...");
304
305 // Merge results
306 QString mergeError;
307 bool mergeSuccess = mergeResults(tempOutputPaths_, finalOutputPath_, mergeError);
308
309 if (!mergeSuccess) {
310 emit error(QString("Failed to merge chunk outputs: %1").arg(mergeError));
311 return;
312 }
313
314 const double seconds = elapsed_.isValid()
315 ? static_cast<double>(elapsed_.elapsed()) / 1000.0
316 : 0.0;
317 emit progress(
318 QString("Results merged successfully in %1s")
319 .arg(QString::number(seconds, 'f', 2)));
320}
321
322} // namespace acav
Generic parallel process runner for executing multiple instances of a program.
Helpers for draining process output into log entries.
virtual std::vector< QStringList > chunkInputData(const QStringList &inputData, int chunkCount) const
Divide input data into chunks for parallel processing.
void error(const QString &errorMessage)
Emitted when an error occurs that prevents execution.
QString getTempOutputPath(int chunkIndex) const
Get path to temporary output file for a chunk.
void progress(const QString &message)
Emitted with progress updates (e.g., "Completed 3/8 chunks").
virtual bool mergeResults(const QStringList &tempOutputPaths, const QString &finalOutputPath, QString &errorMessage)=0
Merge outputs from all chunks into final result.
QString finalOutputPath_
Final output file path.
void setParallelCount(int count)
Set number of parallel processes (0 = auto-detect from CPU cores).
void cancel()
Cancel all running processes.
virtual QStringList prepareProcessArguments(int chunkIndex, const QStringList &chunkData, const QString &tempOutputPath)=0
Prepare command-line arguments for a specific chunk.
virtual void onAllCompleted(int successCount, int failureCount, int totalCount)
Called when all processes have completed (success or failure).
bool isRunning() const
Check if any process is currently running.
QStringList errorMessages_
Aggregated error messages.
void runParallel(const QString &programPath, const QStringList &inputData, const QString &finalOutputPath)
Start parallel execution.
QTemporaryDir tempDir_
Temporary directory for chunk outputs.