任务类

任务是一种特殊的查询,如果说普通的查询操作是静态的、一次性的话,那么任务查询则是动态的、持续性的查询。

经过一次查询,用户并不一定从现在的结果中能找到想要,那么系统允许基于这个查询创建持续性的查询任务,每当出现新职位的时候都会进行增量查询,直到找到想要的结果为止。

所以任务类的结构和查询结果类其实比较相似,都会储存查询结果。不同的是查询结果类的结果是静态的,而任务类的查询结果可能随时会增加。

具体属性和方法如下:

实例属性:

属性名称 类型 描述
id int 任务id
name std::string 任务名称(可由用户自定义)
query_expression_str std::string 查询表达式
job_version_after uint64_t 只查询创建时间在这之后的的职位
expire_time Base::DateTime 任务的失效时间(到达失效时间后,后台不再继续查询)
exist bool 任务是否存在(进行删除时,会将这个属性标记为false。下次重启程序时将不加载exist==false的任务,来间接达到删除的目的。这样可以解决一些同步问题)
query_result std::vector 查询结果
user User& 所关联的用户的引用

实例方法(针对某个具体任务的操作):

实例方法 描述
各属性的get方法 如get_id()、get_name()等等
int do_query() 对整个职位库范围进行查询,查询结果将自动添加到query_result中。返回值为所添加的查询结果的数目
int do_query(const std::vector& job_pointer_list) 和上面的方法不同点在于可以给定一个职位数组,在这个数组范围进行查询。返回值为所添加的查询结果的数目

do_query()代码:

int Task::do_query()
{
    if (!m_exist) return -1;

    std::vector<const Job*> query_result;
    uint64_t version = Model::Job::Query(get_expression(), query_result, m_job_version_after);

    add_query_result(query_result,version);
    if (query_result.size()>0 && m_exist) m_user.notify_task_query_result_changed();
    return query_result.size();
}

int Task::do_query(const std::vector<const Job*>& job_pointer_list)
{
    if (!m_exist) return -1;

    std::vector<const Job*> query_result;

    uint64_t version = m_job_version_after;


    for (const Job* p_job : job_pointer_list)
    {
        if (version < p_job->get_version()) version = p_job->get_version();
        if (get_expression()->is_match(*p_job))
        {
            query_result.emplace_back(p_job);
        }
    }

    add_query_result(query_result,version);
    if (query_result.size()>0 && m_exist) m_user.notify_task_query_result_changed();
    return query_result.size();
}

void Task::add_query_result(const std::vector<const Job*>& query_result,uint64_t version)
{
    using namespace Base::Database;

    if (query_result.size() > 0)
    {
        //因为插入的是数组,所以要避免插入一半时出现意外导致数据残缺的情况,要将整个过程当做是一个原子操作对待
        MultiSqlTask *task = new MultiSqlTask;

        static const std::string kSql1("INSERT INTO tjh_task_queryresult_item(task_id,job_id,result_index) VALUES(?,?,?);");

        for (int i = 0, length = query_result.size(), offset = m_query_result.size(); i < length; i++)
        {
            task->emplace(SqlTaskType::DML, kSql1, m_id, query_result[i]->get_id(), offset + i);
        }

        static const std::string kSql2("UPDATE tjh_task_detail SET job_version_after=? WHERE task_id=?;");
        task->emplace(SqlTaskType::DML, kSql2, version, m_id);

        Common::DBManager::EnqueueMultiTask(SqlTaskPriority::Low, task);

        //如果有查询结果,则追加到已有结果当中
        {
            std::unique_lock<std::mutex> lock(m_mutex_data);
            m_query_result.insert(m_query_result.end(), query_result.begin(), query_result.end());
            //更新职位数据库版本,下次查询只查询在这个版本以后的职位
            m_job_version_after = version;
        }
    }
    else
    {
        std::unique_lock<std::mutex> lock(m_mutex_data);
        //更新职位数据库版本,下次查询只查询在这个版本以后的职位
        m_job_version_after = version;
    }
}

类方法(针对整个任务库的操作):

类方法 描述
static Task& Add(const std::string& task_name,User& user,const std::string& query_expression_str,uint64_t job_version_after,const Base::DateTime &expire_time) 向整个任务库添加一个任务
static Task* GetPointer(int id) 通过任务id获取任务实例
static bool ContainsKey(int id) 判断是否存在该id的任务
static void Erase(int id) 删除任务(采用比较柔和的手段)
static std::vector ForEach(const std::function& func) 遍历所有任务执行特定操作
static void SetTaskAddedCallBack(const std::function& callback) 注册任务添加事件的监听器,任务处理模块会调用此方法注册自身,这样每当有新任务时任务处理模块能够第一时间得知
static void AsyncLoadFromDataBase() 从数据库加载信息到STL容器(异步),以后获取信息就可以跳过数据库直接从内存获取
static void WaitForReady() 等待任务库就绪(和上面的方法成对,调用该方法会阻塞线程,直到已经从数据库加载完所需数据)

代码:

Task& Task::Add(const std::string& task_name,
    User& user,
    const std::string& query_expression_str,
    uint64_t job_version_after,
    const Base::DateTime &expire_time)
{
    using Common::DBManager;
    using namespace Base::Database;


    std::unique_lock<std::mutex> lock_database(_mutex_task_database);
    Task& task = _task_map.emplace(std::piecewise_construct, std::forward_as_tuple(_new_task_id),
        std::forward_as_tuple(_new_task_id,task_name, user, query_expression_str, job_version_after, expire_time)).first->second;
    _task_pointer_list.emplace_back(&task);
    user.append_task(&task);
    lock_database.unlock();


    MultiSqlTask *sqlTask = new MultiSqlTask;

    static const std::string kSql1("INSERT INTO tjh_task_detail(task_id,task_name,user_name,query_expression,job_version_after,expire_time) VALUES(?,?,?,?,?,?);");
    sqlTask->emplace(SqlTaskType::DML, kSql1, _new_task_id,task_name, user.get_name(), query_expression_str, job_version_after, expire_time);

    _new_task_id++;

    static const std::string kSql2("UPDATE tjh_task_other SET new_task_id = ?");
    sqlTask->emplace(SqlTaskType::DML, kSql2, _new_task_id);


    Common::DBManager::EnqueueMultiTask(SqlTaskPriority::Low, sqlTask);

    //回调方法告知有新任务
    _task_added_callback(task);

    return task;
}

Task* Task::GetPointer(int id)
{
    auto it = _task_map.find(id);
    if (it != _task_map.end())
    {
        Task* ret = &it->second;
        return ret->m_exist ? ret : nullptr;
    }
    else
    {
        return nullptr;
    }
}

Task& Task::Get(int id)
{
    assert(ContainsKey(id));
    return *GetPointer(id);
}

void Task::Erase(int id)
{
    using Common::DBManager;
    using namespace Base::Database;
    Task* p_task = GetPointer(id);
    if (p_task != nullptr)
    {
        p_task->m_user.erase_task(p_task);
        p_task->m_exist = false;

        static const std::string kSql("DELETE FROM tjh_task_detail where task_id = ?;");
        Common::DBManager::EnqueueDMLTask(SqlTaskPriority::Normal, kSql, id);
    }

}

std::vector<Task*> Task::ForEach(const std::function<void(Task&)>& func)
{
    std::vector<Task*> task_pointer_list_clone;
    {
        std::unique_lock<std::mutex> lock(_mutex_task_database);
        task_pointer_list_clone = _task_pointer_list;
    }

    for (Task* p_task : task_pointer_list_clone)
    {
        if (p_task->m_exist) func(*p_task);
    }

    return task_pointer_list_clone;

}

void Task::AsyncLoadFromDataBase()
{
    using namespace Base::Database;
    using Common::DateHelper;

    static const std::string kSqlQueryTaskOther("SELECT new_task_id FROM tjh_task_other;");
    Common::DBManager::EnqueueDQLTask(SqlTaskPriority::Highest, [](const DQLSqlTask& task){
        deal_with_resultset_init(task, [](sql::ResultSet* res){
            if (res->next())
            {
                _new_task_id = res->getInt("new_task_id");
            }
            else
            {
                static const std::string kSql("INSERT INTO tjh_task_other(new_task_id) VALUES(?);");
                Common::DBManager::EnqueueDMLTask(SqlTaskPriority::Normal, kSql, _new_task_id);
            }
        });
    }, kSqlQueryTaskOther);


    //读取任务数据
    static const std::string kSqlQueryTaskDetail("SELECT task_id,task_name,user_name,query_expression,job_version_after,expire_time FROM tjh_task_detail;");
    Common::DBManager::EnqueueDQLTask(SqlTaskPriority::Highest, [](const DQLSqlTask& task){

        foreach_line_in_resultset_init(task, [](sql::ResultSet* res){
            int id = res->getInt("task_id");        
            //正常情况下id是不可能大于_new_task_id的,但为了防止人为修改数据库造成的异常情况,加个判断
            if (id > _new_task_id)_new_task_id = id;

            std::string user_name = res->getString("user_name");
            User* p_user = User::GetPointer(user_name);
            if (p_user != nullptr)
            {
                Task& task = _task_map.emplace(std::piecewise_construct, std::forward_as_tuple(id),
                    std::forward_as_tuple(id, res->getString("task_name"), *p_user,
                    res->getString("query_expression"),res->getUInt64("job_version_after"),
                    DateHelper::ToDateTime(res->getString("expire_time")))).first->second;
                _task_pointer_list.emplace_back(&task);
                p_user->append_task(&task);                    
            }
            else
            {
                G::LogOfProgram().error("增加任务信息时数据库异常,出现了不存在的user");
            }


        });

        for (int i = 0, length = _task_pointer_list.size(); i < length; i++)
        {
            static const std::string kSqlQueryTaskJobQueryResultDetail("SELECT job_id,result_index FROM tjh_task_queryresult_item where task_id = ? ORDER BY result_index;");

            Task& task = *_task_pointer_list[i];

            bool is_last = i == length - 1;
            Common::DBManager::EnqueueDQLTask(SqlTaskPriority::Highest, [is_last, &task](const DQLSqlTask& sqlTask){
                foreach_line_in_resultset_init(sqlTask, [&task](sql::ResultSet* res){
                    int job_id = res->getInt("job_id");
                    const Job* p_job = Job::GetPointer(job_id);
                    if (p_job != nullptr)
                    {
                        task.m_query_result.emplace_back(p_job);
                    }
                    else
                    {
                        G::LogOfProgram().error("增加职位查询结果信息时数据库异常,出现了不存在的job");
                    }
                });

                if (is_last)
                {
                    _is_ready = true;
                    _cond_ready.notify_one();//通知主线程整个查询任务已经结束
                    G::LogOfProgram().info("Task库已完成初始化");
                }

            }, kSqlQueryTaskJobQueryResultDetail, task.m_id);
        }

    }, kSqlQueryTaskDetail);

}

void Task::WaitForReady()
{
    std::mutex mutex;
    std::unique_lock<std::mutex> lock(mutex);
    _cond_ready.wait(lock, [](){ return _is_ready; });
}

results matching ""

    No results matching ""