抓取结果类

这个类用来存放职位抓取模块(爬虫模块)的抓取状态和结果,从而给职位抓取模块提供中断续做等功能。 它和其他实体类比起来比较特殊,它最多只会有两个实例,一个实例存放现在正在进行的抓取作业的状态,另一个实例存放上次进行的抓取作业的结果。

属性和方法如下:

实例属性:

属性名称 类型 描述
job_id_set std::set 抓取到的职位的id
is_working bool 是否正在进行抓取作业
begin_time Base::DateTime 开始抓取作业的的时间
is_special bool 是否为特殊抓取任务
first_list_page_hash uint64_t 抓取到的列表页的第一页的html的hash值。每次要开始抓取任务前,会判断上次抓取结果的这个值和这次得到的hash值是否一致。若一致说明列表页第一页内容没发生变化,则不进行抓取作业
claw_start_date Base::Date 抓取范围的起始日期(包含当天)
claw_target_date Base::Date 抓取范围的结束日期(包含当天)
claw_current_date Base::Date 表示正在抓取哪一天发布的职位
first_list_page_first_date Base::Date 列表页第1页第1个职位的发布日期
first_list_page_last_date Base::Date 列表页第1页最后一个职位的发布日期
current_clawing_index int 表示当前抓取到列表页的职位的索引(从0开始,每10个为1页)
total_detail_parse_count int 一共解析了多少次详情页
total_detail_parse_errors int 解析详情页失败的总次数
total_network_errors int 发生网络错误的总次数
total_parse_errors int 解析html失败的总次数
map_item_index_by_date std::unordered_map 这个map表示第1次出现给定日期发布的职位在列表页中的索引。它的key是发布日期的时间戳。

如表格所示,抓取结果类的属性提供了爬虫工作所需的各种信息,爬虫模块进行作业的途中会更新这些属性,而当程序意外中断时,下次启动时也可以凭借这些信息还原工作的现场。

实例方法

只有各属性的get和set方法,略过。

类方法

类方法 描述
static void CommitInitInfo() 抓取作业开始时会调用此方法,将抓取作业的初始化信息提交到数据库
static void CommitStatus() 将目前抓取到的信息提交到数据库
static JobClawResult& GetCurrentClawResult() 获取当前抓取作业的抓取结果类实例
static const JobClawResult& GetLastClawResult() 获取上次抓取作业的抓取结果类实例(因为结果已确定因此是只读的)
static void SaveCurrentResult() 在抓取作业结束后调用,将会将"当前"结果移动到"上次"结果覆盖,然后清空本次作业的结果,将is_working状态置为false
static void PutIntoJobSet(int id) 将抓取到的职位的id放入抓取结果中
static void SetItemIndexOfDate(const Base::Date& date, int item_index) 此方法用来更新map_item_index_by_date,表示第一次出现某个日期发布的职位于列表页的索引
static bool GetLastResultOfItemIndexOfDate(const Base::Date& date, int& item_index) 此方法用来获取上次抓取结果中,第一次出现某个日期发布的职位于列表页的索引,该方法的意义在于那上次的结果来预测本次的结果。
static bool IfJobInfoUpdated() 开始抓取作业时调用,会比较这次抓取和上次抓取的first_list_page_hash(列表页第1页html的hash值),若两个值相同代表列表页没发生变化,则跳过本次抓取作业
static void AsyncLoadFromDatabase() 从数据库加载信息到STL容器(异步),以后获取信息就可以跳过数据库直接从内存获取
static void WaitForReady() 等待抓取结果库就绪(和上面的方法成对,调用该方法会阻塞线程,直到已经从数据库加载完所需数据)

代码:

void JobClawResult::CommitInitInfo()
{
    using namespace Base::Database;
    static const std::string kSql("UPDATE tjh_clawresult_detail SET "
        "claw_start_date=?,claw_target_date=?,first_list_page_hash=?,is_special=?,begin_time=?,"
        "total_detail_parse_count=0,total_detail_parse_errors=0,total_network_errors=0,total_parse_errors=0 "
        "WHERE claw_id=0;");
    JobClawResult& c = _claw_result_curr;
    Common::DBManager::EnqueueDMLTask(SqlTaskPriority::Normal, kSql,
        c.m_claw_start_date, c.m_claw_target_date,c.m_first_list_page_hash,c.m_is_special,c.m_begin_time
        );
}

void JobClawResult::CommitStatus()
{
    using namespace Base::Database;
    static const std::string kSql("UPDATE tjh_clawresult_detail SET claw_current_index=?,claw_current_date=?,"
        "total_detail_parse_count=?,total_detail_parse_errors=?,total_network_errors=?,total_parse_errors=?,is_working=1 "
        "WHERE claw_id=0;");
    JobClawResult& c = _claw_result_curr;
    Common::DBManager::EnqueueDMLTask(SqlTaskPriority::Normal, kSql,
        c.m_current_clawing_index, c.m_claw_current_date,
        c.m_total_detail_parse_count, c.m_total_detail_parse_errors, c.m_total_network_errors, c.m_total_parse_errors
    );
}

//将此次的抓取结果保存到数据库,并记录上次结果
void JobClawResult::SaveCurrentResult()
{
    JobClawResult &c = _claw_result_curr, &l = _claw_result_last;
    bool is_hash_equal = l.m_first_list_page_hash == c.m_first_list_page_hash;
    if (is_hash_equal)
    {
        //职位信息未发生变动,那么两个map可以进行合并操作
        for (auto pair : c.m_map_item_index_by_date)
        {
            if (!Common::ContainerHelper::Contains(l.m_map_item_index_by_date, pair.first))
            {
                l.m_map_item_index_by_date.emplace(pair.first, pair.second);
            }
        }
    }
    else
    {
        //职位信息已发生变动,上次的结果已过期,直接覆盖
        l.m_map_item_index_by_date = c.m_map_item_index_by_date;
    }

    l.m_first_list_page_hash = c.m_first_list_page_hash;
    l.m_current_clawing_index = c.m_current_clawing_index;
    l.m_total_detail_parse_count = c.m_total_detail_parse_count;
    l.m_total_detail_parse_errors = c.m_total_detail_parse_errors;
    l.m_total_network_errors = c.m_total_network_errors;
    l.m_total_parse_errors = c.m_total_parse_errors;
    l.m_claw_start_date = c.m_claw_start_date;
    l.m_claw_target_date = c.m_claw_target_date;
    l.m_claw_current_date = c.m_claw_current_date;
    l.m_is_working = c.m_is_working;
    l.m_begin_time = c.m_begin_time;
    l.m_is_special = c.m_is_special;

    c.m_total_detail_parse_count = 0;
    c.m_total_detail_parse_errors = 0;
    c.m_total_network_errors = 0;
    c.m_total_parse_errors = 0;
    c.m_is_working = false;
    c.m_is_special = false;
    c.m_map_item_index_by_date.clear();
    c.m_job_id_set.clear();

    //将结果写入数据库
    using namespace Base::Database;
    static const std::string kSql1("DELETE FROM tjh_clawresult_detail WHERE claw_id=1;");
    static const std::string kSql2("UPDATE tjh_clawresult_detail SET claw_id=1,is_working=0 WHERE claw_id=0;");
    static const std::string kSql3("INSERT INTO tjh_clawresult_detail(claw_id,is_working) VALUES(0,0);");
    //因为这必须是一个原子操作,要么做要么不做,所以将5个操作组成一个事务
    MultiSqlTask *task = new MultiSqlTask;
    task->emplace(SqlTaskType::DML, kSql1);
    task->emplace(SqlTaskType::DML, kSql2);
    task->emplace(SqlTaskType::DML, kSql3);

    if (is_hash_equal)
    {
        static const std::string kSql4("DELETE t1.* FROM tjh_clawresult_dateinfo AS t1 "
            "INNER JOIN tjh_clawresult_dateinfo AS t2 "
            "ON t1.claw_id!=t2.claw_id AND t1.job_date=t2.job_date "
            "WHERE t1.claw_id=0;");
        task->emplace(SqlTaskType::DML, kSql4);//先去重,再更新
    }
    else
    {
        static const std::string kSql4("DELETE FROM tjh_clawresult_dateinfo WHERE claw_id=1;");
        task->emplace(SqlTaskType::DML, kSql4);
    }

    static const std::string kSql5("UPDATE tjh_clawresult_dateinfo SET claw_id=1 WHERE claw_id=0;");
    task->emplace(SqlTaskType::DML, kSql5);

    if (l.m_is_special)
    {
        static const std::string kSql6("DELETE FROM tjh_clawresult_joblist;");//抓取完成后这个清单就用不上了,清空
        task->emplace(SqlTaskType::DML, kSql6);

        _last_special_time = _claw_result_last.m_begin_time;
        _last_special_first_list_page_hash = _claw_result_last.m_first_list_page_hash;
        static const std::string kSql7("UPDATE tjh_clawresult_last_special_info SET last_special_time=?,first_list_page_hash=?;");
        task->emplace(SqlTaskType::DML, kSql7, _last_special_time, _last_special_first_list_page_hash);
    }

    Common::DBManager::EnqueueMultiTask(SqlTaskPriority::Normal,task);
}

void JobClawResult::PutIntoJobSet(int id)
{
    using namespace Base::Database;
    if (!Common::ContainerHelper::Contains(_claw_result_curr.m_job_id_set, id))
    {
        _claw_result_curr.m_job_id_set.emplace(id);
        static const std::string kSql("INSERT INTO tjh_clawresult_joblist(job_id) VALUES(?);");
        Common::DBManager::EnqueueDMLTask(SqlTaskPriority::Normal, kSql, id);
    }
}

//保存职位信息的抓取结果,下次抓取时就可以用本次结果来进行预测
void JobClawResult::SetItemIndexOfDate(const Base::Date& date, int item_index)
{
    int date_hash = Common::DateHelper::ToInt(date);        
    if (!Common::ContainerHelper::Contains(_claw_result_curr.m_map_item_index_by_date, date_hash))
    {
        _claw_result_curr.m_map_item_index_by_date.emplace(date_hash, item_index);
        Global::LogOfClawer().info(fmt("职位的日期[%1%]于第%2%个职位中首次出现") % Common::DateHelper::ToString(date) % (item_index+1));

        using namespace Base::Database;
        static const std::string kSql("INSERT INTO tjh_clawresult_dateinfo(claw_id,job_date,item_index) VALUES(0,?,?);");
        Common::DBManager::EnqueueDMLTask(SqlTaskPriority::Normal, kSql, date, item_index);
    }
}

bool JobClawResult::GetLastResultOfItemIndexOfDate(const Base::Date& date, int& item_index)
{
    if (IfJobInfoUpdated())return false;//如果页面发生变动,那么上次的结果已经失效,不再具有参考意义

    int date_hash = Common::DateHelper::ToInt(date);
    auto it = _claw_result_last.m_map_item_index_by_date.find(date_hash);

    if (it != _claw_result_last.m_map_item_index_by_date.end())
    {
        item_index = it->second;
        return true;
    }
    return false;        
}

bool JobClawResult::IfJobInfoUpdated()
{
    return _claw_result_curr.m_first_list_page_hash == 0 || _claw_result_curr.m_first_list_page_hash != _claw_result_last.m_first_list_page_hash;
}

void JobClawResult::AsyncLoadFromDatabase()
{
    using namespace Base::Database;
    using Common::DateHelper;
    using Base::Date;

    static const std::string kSqlQueryClawResultDateInfo("SELECT * FROM tjh_clawresult_dateinfo;");
    Common::DBManager::EnqueueDQLTask(SqlTaskPriority::Highest, [](const DQLSqlTask& task){
        foreach_line_in_resultset_init(task, [](sql::ResultSet* res){
            JobClawResult& claw_result = res->getInt("claw_id")==0 ? _claw_result_curr : _claw_result_last;
            claw_result.m_map_item_index_by_date.emplace(DateHelper::ToInt(DateHelper::ToDate(res->getString("job_date").c_str())), res->getInt("item_index"));
        });
    }, kSqlQueryClawResultDateInfo);


    static const std::string kSqlQueryClawResultLastSpecialTimeInfo("SELECT * FROM tjh_clawresult_last_special_info;");
    Common::DBManager::EnqueueDQLTask(SqlTaskPriority::Highest, [](const DQLSqlTask& task){
        deal_with_resultset_init(task, [](sql::ResultSet* res){
            if (res->next())
            {
                _last_special_time = DateHelper::ToDateTime(res->getString("last_special_time"));
                _last_special_first_list_page_hash = res->getUInt64("first_list_page_hash");
            }
            else
            {
                static const std::string kSqlInsertDefaultClawLastSpecialTime
                    ("INSERT INTO tjh_clawresult_last_special_info(last_special_time,first_list_page_hash) VALUES(?,0)");
                Common::DBManager::EnqueueDMLTask(SqlTaskPriority::Normal, kSqlInsertDefaultClawLastSpecialTime, _last_special_time);
            }
        });
    }, kSqlQueryClawResultLastSpecialTimeInfo);

    static const std::string kSqlQueryClawResultJobList("SELECT job_id FROM tjh_clawresult_joblist;");
    Common::DBManager::EnqueueDQLTask(SqlTaskPriority::Highest, [](const DQLSqlTask& task){
        foreach_line_in_resultset_init(task, [](sql::ResultSet* res){
            _claw_result_curr.m_job_id_set.emplace(res->getInt("job_id"));
        });
    }, kSqlQueryClawResultJobList);

    static const std::string kSqlQueryClawResult("SELECT * FROM tjh_clawresult_detail;");
    Common::DBManager::EnqueueDQLTask(SqlTaskPriority::Highest, [](const DQLSqlTask& task){
        deal_with_resultset_init(task, [](sql::ResultSet* res){
            bool curr_result_exists = false, last_result_exists = false;
            JobClawResult* p_claw_result;
            while (res->next())
            {
                switch (res->getInt("claw_id"))
                {
                case 0:
                    p_claw_result = &_claw_result_curr;
                    curr_result_exists = true;
                    break;
                case 1:
                    p_claw_result = &_claw_result_last;
                    last_result_exists = true;
                    break;
                default:
                    continue;
                }
                p_claw_result->m_current_clawing_index = res->getInt("claw_current_index");
                p_claw_result->m_claw_start_date = DateHelper::ToDate(res->getString("claw_start_date"));
                p_claw_result->m_claw_target_date = DateHelper::ToDate(res->getString("claw_target_date"));
                p_claw_result->m_claw_current_date = DateHelper::ToDate(res->getString("claw_current_date"));
                p_claw_result->m_first_list_page_hash = res->getUInt64("first_list_page_hash");
                p_claw_result->m_total_detail_parse_count = res->getInt("total_detail_parse_count");
                p_claw_result->m_total_detail_parse_errors = res->getInt("total_detail_parse_errors");
                p_claw_result->m_total_network_errors = res->getInt("total_network_errors");
                p_claw_result->m_total_parse_errors = res->getInt("total_parse_errors");
                p_claw_result->m_is_special = res->getBoolean("is_special");
                p_claw_result->m_is_working = res->getBoolean("is_working");
                p_claw_result->m_begin_time = DateHelper::ToDateTime(res->getString("begin_time"));
            }
            static const std::string kSqlInsertClawResult("INSERT INTO tjh_clawresult_detail(claw_id,is_special,is_working) VALUES(?,?,?);");
            if (!curr_result_exists)
            {
                Common::DBManager::EnqueueDMLTask(SqlTaskPriority::Normal, kSqlInsertClawResult, 0, false, false);
            }
            if (!last_result_exists)
            {
                Common::DBManager::EnqueueDMLTask(SqlTaskPriority::Normal, kSqlInsertClawResult, 1, false, false);
            }
        });
        _is_ready = true;
        _cond_ready.notify_one();
    }, kSqlQueryClawResult);
}

void JobClawResult::WaitForReady()
{
    if (_is_ready)return;
    std::mutex mutex;
    std::unique_lock<std::mutex> lock(mutex);
    _cond_ready.wait(lock);
}

results matching ""

    No results matching ""