From 5c07e1b042ecd22dfcd3ea8cf656c99549cc984b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Mr=C3=A1zek?= Date: Sat, 6 Jul 2013 01:50:07 +0200 Subject: Move job queue to libutil --- asset_test.cpp | 344 +-------------------------------------------- libutil/CMakeLists.txt | 3 + libutil/include/dlqueue.h | 49 +++++++ libutil/include/jobqueue.h | 179 +++++++++++++++++++++++ libutil/src/dlqueue.cpp | 128 +++++++++++++++++ 5 files changed, 360 insertions(+), 343 deletions(-) create mode 100644 libutil/include/dlqueue.h create mode 100644 libutil/include/jobqueue.h create mode 100644 libutil/src/dlqueue.cpp diff --git a/asset_test.cpp b/asset_test.cpp index ca4ef506..cd2cccd4 100644 --- a/asset_test.cpp +++ b/asset_test.cpp @@ -1,348 +1,6 @@ -#include -#include #include #include - -enum JobStatus -{ - Job_NotStarted, - Job_InProgress, - Job_Finished, - Job_Failed -}; - -class JobList; - -class Job : public QObject -{ - Q_OBJECT -protected: - explicit Job(): QObject(0){}; -public: - virtual ~Job() {}; -signals: - void finish(); - void fail(); - void progress(qint64 current, qint64 total); -public slots: - virtual void start() = 0; -}; -typedef QSharedPointer JobPtr; - -/** - * A list of jobs, to be processed one by one. - */ -class JobList : public QObject -{ - friend class JobListQueue; - Q_OBJECT -public: - - JobList() : QObject(0) - { - m_status = Job_NotStarted; - current_job_idx = 0; - } - JobStatus getStatus() - { - return m_status; - } - void add(JobPtr dlable) - { - if(m_status == Job_NotStarted) - m_jobs.append(dlable); - //else there's a bug. TODO: catch the bugs - } - JobPtr getFirstJob() - { - if(m_jobs.size()) - return m_jobs[0]; - else - return JobPtr(); - } - void start() - { - current_job_idx = 0; - auto job = m_jobs[current_job_idx]; - - connect(job.data(), SIGNAL(progress(qint64,qint64)), SLOT(currentJobProgress(qint64,qint64))); - connect(job.data(), SIGNAL(finish()), SLOT(currentJobFinished())); - connect(job.data(), SIGNAL(fail()), SLOT(currentJobFailed())); - job->start(); - emit started(); - } -private slots: - void currentJobFinished() - { - if(current_job_idx == m_jobs.size() - 1) - { - m_status = Job_Finished; - emit finished(); - } - else - { - current_job_idx++; - auto job = m_jobs[current_job_idx]; - connect(job.data(), SIGNAL(progress(qint64,qint64)), SLOT(currentJobProgress(qint64,qint64))); - connect(job.data(), SIGNAL(finish()), SLOT(currentJobFinished())); - connect(job.data(), SIGNAL(fail()), SLOT(currentJobFailed())); - job->start(); - } - } - void currentJobFailed() - { - m_status = Job_Failed; - emit failed(); - } - void currentJobProgress(qint64 current, qint64 total) - { - if(!total) - return; - - int total_jobs = m_jobs.size(); - - if(!total_jobs) - return; - - float job_chunk = 1000.0 / float(total_jobs); - float cur = current; - float tot = total; - float last_chunk = (cur / tot) * job_chunk; - - float list_total = job_chunk * current_job_idx + last_chunk; - emit progress(qint64(list_total), 1000LL); - } -private: - QVector m_jobs; - /// The overall status of this job list - JobStatus m_status; - int current_job_idx; -signals: - void progress(qint64 current, qint64 total); - void started(); - void finished(); - void failed(); -}; -typedef QSharedPointer JobListPtr; - - -/** - * A queue of job lists! The job lists fail or finish as units. - */ -class JobListQueue : public QObject -{ - Q_OBJECT -public: - JobListQueue(QObject *p = 0): - QObject(p), - nam(new QNetworkAccessManager()), - currentIndex(0), - is_running(false){} - - void enqueue(JobListPtr job) - { - jobs.enqueue(job); - - // finish or fail, we should catch that and start the next one - connect(job.data(),SIGNAL(finished()), SLOT(startNextJob())); - connect(job.data(),SIGNAL(failed()), SLOT(startNextJob())); - - if(!is_running) - { - QTimer::singleShot(0, this, SLOT(startNextJob())); - } - } - -private slots: - void startNextJob() - { - if (jobs.isEmpty()) - { - currentJobList.clear(); - currentIndex = 0; - is_running = false; - emit finishedAllJobs(); - return; - } - - currentJobList = jobs.dequeue(); - is_running = true; - currentIndex = 0; - currentJobList->start(); - } - -signals: - void finishedAllJobs(); - -private: - JobListPtr currentJobList; - QQueue jobs; - QSharedPointer nam; - unsigned currentIndex; - bool is_running; -}; - -/** - * A single file for the downloader/cache to process. - */ -class DownloadJob : public Job -{ - friend class Downloader; - Q_OBJECT -private: - DownloadJob(QUrl url, QString rel_target_path = QString(), QString expected_md5 = QString()) - :Job() - { - m_url = url; - m_rel_target_path = rel_target_path; - m_expected_md5 = expected_md5; - - m_check_md5 = m_expected_md5.size(); - m_save_to_file = m_rel_target_path.size(); - m_status = Job_NotStarted; - }; -public: - static JobPtr create(QUrl url, QString rel_target_path = QString(), QString expected_md5 = QString()) - { - return JobPtr(new DownloadJob(url, rel_target_path, expected_md5)); - } -public slots: - virtual void start() - { - m_manager.reset(new QNetworkAccessManager()); - if(m_save_to_file) - { - QString filename = m_rel_target_path; - m_output_file.setFileName(filename); - // if there already is a file and md5 checking is in effect - if(m_output_file.exists() && m_check_md5) - { - // and it can be opened - if(m_output_file.open(QIODevice::ReadOnly)) - { - // check the md5 against the expected one - QString hash = QCryptographicHash::hash(m_output_file.readAll(), QCryptographicHash::Md5).toHex().constData(); - m_output_file.close(); - // skip this file if they match - if(hash == m_expected_md5) - { - qDebug() << "Skipping " << m_url.toString() << ": md5 match."; - emit finish(); - return; - } - } - } - QFileInfo a(filename); - QDir dir; - if(!dir.mkpath(a.path())) - { - /* - * error when making the folder structure - */ - emit fail(); - return; - } - if (!m_output_file.open(QIODevice::WriteOnly)) - { - /* - * Can't open the file... the job failed - */ - emit fail(); - return; - } - } - qDebug() << "Downloading " << m_url.toString(); - QNetworkRequest request(m_url); - QNetworkReply * rep = m_manager->get(request); - m_reply = QSharedPointer(rep, &QObject::deleteLater); - connect(rep, SIGNAL(downloadProgress(qint64,qint64)), SLOT(downloadProgress(qint64,qint64))); - connect(rep, SIGNAL(finished()), SLOT(downloadFinished())); - connect(rep, SIGNAL(error(QNetworkReply::NetworkError)), SLOT(downloadError(QNetworkReply::NetworkError))); - connect(rep, SIGNAL(readyRead()), SLOT(downloadReadyRead())); - }; -private slots: - void downloadProgress(qint64 bytesReceived, qint64 bytesTotal) - { - emit progress(bytesReceived, bytesTotal); - }; - - void downloadError(QNetworkReply::NetworkError error) - { - // error happened during download. - // TODO: log the reason why - m_status = Job_Failed; - } - - void downloadFinished() - { - // if the download succeeded - if(m_status != Job_Failed) - { - // nothing went wrong... - m_status = Job_Finished; - // save the data to the downloadable if we aren't saving to file - if(!m_save_to_file) - { - m_data = m_reply->readAll(); - } - else - { - m_output_file.close(); - } - - //TODO: check md5 here! - m_reply.clear(); - emit finish(); - return; - } - // else the download failed - else - { - if(m_save_to_file) - { - m_output_file.close(); - m_output_file.remove(); - } - m_reply.clear(); - emit fail(); - return; - } - } - void downloadReadyRead() - { - if(m_save_to_file) - { - m_output_file.write( m_reply->readAll()); - } - }; - -public: - /// the associated network manager - QSharedPointer m_manager; - /// the network reply - QSharedPointer m_reply; - /// source URL - QUrl m_url; - - /// if true, check the md5sum against a provided md5sum - /// also, if a file exists, perform an md5sum first and don't download only if they don't match - bool m_check_md5; - /// the expected md5 checksum - QString m_expected_md5; - - /// save to file? - bool m_save_to_file; - /// if saving to file, use the one specified in this string - QString m_rel_target_path; - /// this is the output file, if any - QFile m_output_file; - /// if not saving to file, downloaded data is placed here - QByteArray m_data; - - - /// The file's status - JobStatus m_status; -}; +#include "dlqueue.h" inline QDomElement getDomElementByTagName(QDomElement parent, QString tagname) { diff --git a/libutil/CMakeLists.txt b/libutil/CMakeLists.txt index caafc756..864c7714 100644 --- a/libutil/CMakeLists.txt +++ b/libutil/CMakeLists.txt @@ -32,6 +32,8 @@ include/osutils.h include/userutils.h include/cmdutils.h include/netutils.h +include/jobqueue.h +include/dlqueue.h ) SET(LIBUTIL_SOURCES @@ -40,6 +42,7 @@ src/osutils.cpp src/userutils.cpp src/cmdutils.cpp src/netutils.cpp +src/dlqueue.cpp ) # Set the include dir path. diff --git a/libutil/include/dlqueue.h b/libutil/include/dlqueue.h new file mode 100644 index 00000000..9041e762 --- /dev/null +++ b/libutil/include/dlqueue.h @@ -0,0 +1,49 @@ +#pragma once +#include "jobqueue.h" +#include + +/** + * A single file for the downloader/cache to process. + */ +class DownloadJob : public Job +{ + Q_OBJECT +public: + DownloadJob(QUrl url, QString rel_target_path = QString(), QString expected_md5 = QString()); + static JobPtr create(QUrl url, QString rel_target_path = QString(), QString expected_md5 = QString()); + +public slots: + virtual void start(); + +private slots: + void downloadProgress(qint64 bytesReceived, qint64 bytesTotal);; + void downloadError(QNetworkReply::NetworkError error); + void downloadFinished(); + void downloadReadyRead(); + +public: + /// the associated network manager + QSharedPointer m_manager; + /// the network reply + QSharedPointer m_reply; + /// source URL + QUrl m_url; + + /// if true, check the md5sum against a provided md5sum + /// also, if a file exists, perform an md5sum first and don't download only if they don't match + bool m_check_md5; + /// the expected md5 checksum + QString m_expected_md5; + + /// save to file? + bool m_save_to_file; + /// if saving to file, use the one specified in this string + QString m_rel_target_path; + /// this is the output file, if any + QFile m_output_file; + /// if not saving to file, downloaded data is placed here + QByteArray m_data; + + /// The file's status + JobStatus m_status; +}; diff --git a/libutil/include/jobqueue.h b/libutil/include/jobqueue.h new file mode 100644 index 00000000..061686f6 --- /dev/null +++ b/libutil/include/jobqueue.h @@ -0,0 +1,179 @@ +#pragma once +#include + +enum JobStatus +{ + Job_NotStarted, + Job_InProgress, + Job_Finished, + Job_Failed +}; + +class JobList; + +class Job : public QObject +{ + Q_OBJECT +protected: + explicit Job(): QObject(0){}; +public: + virtual ~Job() {}; +signals: + void finish(); + void fail(); + void progress(qint64 current, qint64 total); +public slots: + virtual void start() = 0; +}; +typedef QSharedPointer JobPtr; + +/** + * A list of jobs, to be processed one by one. + */ +class JobList : public QObject +{ + friend class JobListQueue; + Q_OBJECT +public: + + JobList() : QObject(0) + { + m_status = Job_NotStarted; + current_job_idx = 0; + } + JobStatus getStatus() + { + return m_status; + } + void add(JobPtr dlable) + { + if(m_status == Job_NotStarted) + m_jobs.append(dlable); + //else there's a bug. TODO: catch the bugs + } + JobPtr getFirstJob() + { + if(m_jobs.size()) + return m_jobs[0]; + else + return JobPtr(); + } + void start() + { + current_job_idx = 0; + auto job = m_jobs[current_job_idx]; + + connect(job.data(), SIGNAL(progress(qint64,qint64)), SLOT(currentJobProgress(qint64,qint64))); + connect(job.data(), SIGNAL(finish()), SLOT(currentJobFinished())); + connect(job.data(), SIGNAL(fail()), SLOT(currentJobFailed())); + job->start(); + emit started(); + } +private slots: + void currentJobFinished() + { + if(current_job_idx == m_jobs.size() - 1) + { + m_status = Job_Finished; + emit finished(); + } + else + { + current_job_idx++; + auto job = m_jobs[current_job_idx]; + connect(job.data(), SIGNAL(progress(qint64,qint64)), SLOT(currentJobProgress(qint64,qint64))); + connect(job.data(), SIGNAL(finish()), SLOT(currentJobFinished())); + connect(job.data(), SIGNAL(fail()), SLOT(currentJobFailed())); + job->start(); + } + } + void currentJobFailed() + { + m_status = Job_Failed; + emit failed(); + } + void currentJobProgress(qint64 current, qint64 total) + { + if(!total) + return; + + int total_jobs = m_jobs.size(); + + if(!total_jobs) + return; + + float job_chunk = 1000.0 / float(total_jobs); + float cur = current; + float tot = total; + float last_chunk = (cur / tot) * job_chunk; + + float list_total = job_chunk * current_job_idx + last_chunk; + emit progress(qint64(list_total), 1000LL); + } +private: + QVector m_jobs; + /// The overall status of this job list + JobStatus m_status; + int current_job_idx; +signals: + void progress(qint64 current, qint64 total); + void started(); + void finished(); + void failed(); +}; +typedef QSharedPointer JobListPtr; + + +/** + * A queue of job lists! The job lists fail or finish as units. + */ +class JobListQueue : public QObject +{ + Q_OBJECT +public: + JobListQueue(QObject *p = 0): + QObject(p), + currentIndex(0), + is_running(false){} + + void enqueue(JobListPtr job) + { + jobs.enqueue(job); + + // finish or fail, we should catch that and start the next one + connect(job.data(),SIGNAL(finished()), SLOT(startNextJob())); + connect(job.data(),SIGNAL(failed()), SLOT(startNextJob())); + + if(!is_running) + { + QTimer::singleShot(0, this, SLOT(startNextJob())); + } + } + +private slots: + void startNextJob() + { + if (jobs.isEmpty()) + { + currentJobList.clear(); + currentIndex = 0; + is_running = false; + emit finishedAllJobs(); + return; + } + + currentJobList = jobs.dequeue(); + is_running = true; + currentIndex = 0; + currentJobList->start(); + } + +signals: + void finishedAllJobs(); + +private: + JobListPtr currentJobList; + QQueue jobs; + unsigned currentIndex; + bool is_running; +}; diff --git a/libutil/src/dlqueue.cpp b/libutil/src/dlqueue.cpp new file mode 100644 index 00000000..dfc51f36 --- /dev/null +++ b/libutil/src/dlqueue.cpp @@ -0,0 +1,128 @@ +#include "include/dlqueue.h" + +DownloadJob::DownloadJob ( QUrl url, QString rel_target_path, QString expected_md5 ) + :Job() +{ + m_url = url; + m_rel_target_path = rel_target_path; + m_expected_md5 = expected_md5; + + m_check_md5 = m_expected_md5.size(); + m_save_to_file = m_rel_target_path.size(); + m_status = Job_NotStarted; +} + +JobPtr DownloadJob::create ( QUrl url, QString rel_target_path, QString expected_md5 ) +{ + return JobPtr ( new DownloadJob ( url, rel_target_path, expected_md5 ) ); +} + +void DownloadJob::start() +{ + m_manager.reset ( new QNetworkAccessManager() ); + if ( m_save_to_file ) + { + QString filename = m_rel_target_path; + m_output_file.setFileName ( filename ); + // if there already is a file and md5 checking is in effect + if ( m_output_file.exists() && m_check_md5 ) + { + // and it can be opened + if ( m_output_file.open ( QIODevice::ReadOnly ) ) + { + // check the md5 against the expected one + QString hash = QCryptographicHash::hash ( m_output_file.readAll(), QCryptographicHash::Md5 ).toHex().constData(); + m_output_file.close(); + // skip this file if they match + if ( hash == m_expected_md5 ) + { + qDebug() << "Skipping " << m_url.toString() << ": md5 match."; + emit finish(); + return; + } + } + } + QFileInfo a ( filename ); + QDir dir; + if ( !dir.mkpath ( a.path() ) ) + { + /* + * error when making the folder structure + */ + emit fail(); + return; + } + if ( !m_output_file.open ( QIODevice::WriteOnly ) ) + { + /* + * Can't open the file... the job failed + */ + emit fail(); + return; + } + } + qDebug() << "Downloading " << m_url.toString(); + QNetworkRequest request ( m_url ); + QNetworkReply * rep = m_manager->get ( request ); + m_reply = QSharedPointer ( rep, &QObject::deleteLater ); + connect ( rep, SIGNAL ( downloadProgress ( qint64,qint64 ) ), SLOT ( downloadProgress ( qint64,qint64 ) ) ); + connect ( rep, SIGNAL ( finished() ), SLOT ( downloadFinished() ) ); + connect ( rep, SIGNAL ( error ( QNetworkReply::NetworkError ) ), SLOT ( downloadError ( QNetworkReply::NetworkError ) ) ); + connect ( rep, SIGNAL ( readyRead() ), SLOT ( downloadReadyRead() ) ); +} + +void DownloadJob::downloadProgress ( qint64 bytesReceived, qint64 bytesTotal ) +{ + emit progress ( bytesReceived, bytesTotal ); +} + +void DownloadJob::downloadError ( QNetworkReply::NetworkError error ) +{ + // error happened during download. + // TODO: log the reason why + m_status = Job_Failed; +} + +void DownloadJob::downloadFinished() +{ + // if the download succeeded + if ( m_status != Job_Failed ) + { + // nothing went wrong... + m_status = Job_Finished; + // save the data to the downloadable if we aren't saving to file + if ( !m_save_to_file ) + { + m_data = m_reply->readAll(); + } + else + { + m_output_file.close(); + } + + //TODO: check md5 here! + m_reply.clear(); + emit finish(); + return; + } + // else the download failed + else + { + if ( m_save_to_file ) + { + m_output_file.close(); + m_output_file.remove(); + } + m_reply.clear(); + emit fail(); + return; + } +} + +void DownloadJob::downloadReadyRead() +{ + if ( m_save_to_file ) + { + m_output_file.write ( m_reply->readAll() ); + } +} -- cgit v1.2.3