Skip to content

Instantly share code, notes, and snippets.

@IronsDu
Created September 3, 2019 11:13
Show Gist options
  • Select an option

  • Save IronsDu/d8656d657b0f8296a7b8f0a63ba1eebd to your computer and use it in GitHub Desktop.

Select an option

Save IronsDu/d8656d657b0f8296a7b8f0a63ba1eebd to your computer and use it in GitHub Desktop.
slot_timer
#include <vector>
#include <chrono>
#include <mutex>
#include <functional>
#include <queue>
#include <time.h>
#include <random>
#include <iostream>
class Timer final
{
public:
using Ptr = std::shared_ptr<Timer>;
using WeakPtr = std::weak_ptr<Timer>;
using Callback = std::function<void(void)>;
Timer(std::chrono::steady_clock::time_point startTime,
std::chrono::nanoseconds lastTime,
Callback&& callback) noexcept
:
mCallback(std::forward<Callback>(callback)),
mEndTime(startTime + lastTime)
{
}
const std::chrono::steady_clock::time_point& getEndTime() const
{
return mEndTime;
}
std::chrono::nanoseconds getLeftTime() const
{
return getEndTime() - std::chrono::steady_clock::now();
}
void cancel()
{
std::call_once(mExecuteOnceFlag,
[this]() {
mCallback = nullptr;
});
}
private:
void operator() ()
{
std::call_once(mExecuteOnceFlag,
[this]() {
mCallback();
mCallback = nullptr;
});
}
void setSeqId(size_t seqId)
{
mSeqId = seqId;
}
size_t getSeqId() const
{
return mSeqId;
}
std::once_flag mExecuteOnceFlag;
Callback mCallback;
const std::chrono::steady_clock::time_point mEndTime;
size_t mSeqId = 0;
friend class TimerMgr;
};
class TimerMgr final
{
public:
using Ptr = std::shared_ptr<TimerMgr>;
template<typename F, typename ...TArgs>
Timer::WeakPtr addTimer(std::chrono::nanoseconds timeout, F&& callback, TArgs&& ...args)
{
auto timer = std::make_shared<Timer>(
std::chrono::steady_clock::now(),
std::chrono::nanoseconds(timeout),
std::bind(std::forward<F>(callback), std::forward<TArgs>(args)...));
addTimer(timer);
return timer;
}
void addTimer(const Timer::Ptr& timer)
{
//TODO::´æÔÚ»ØÈÆÎÊÌâµ¼ÖÂ˳Ðò²¢²»Ò»¶¨Äܱ£Ö¤
//timer->setSeqId(mSeqId++);
//TODO::²»Äܱ£Ö¤¶à´ÎaddTimerÖ®¼ä´æÔÚ¶¨Ê±Æ÷µ÷¶È²Ù×÷ʱµÄÏàͬʱ¼ä¶¨Ê±Æ÷µÄÏȺó˳Ðò
timer->setSeqId(mTimers.size());
mTimers.push(timer);
}
void schedule(std::chrono::steady_clock::time_point expiration)
{
while (!mTimers.empty())
{
auto tmp = mTimers.top();
if (tmp->getLeftTime() > std::chrono::nanoseconds::zero())
{
break;
}
if (tmp->getEndTime() > expiration)
{
break;
}
mTimers.pop();
(*tmp)();
}
}
bool isEmpty() const
{
return mTimers.empty();
}
// if timer empty, return zero
std::chrono::nanoseconds nearLeftTime() const
{
if (mTimers.empty())
{
return std::chrono::nanoseconds::zero();
}
auto result = mTimers.top()->getLeftTime();
if (result < std::chrono::nanoseconds::zero())
{
return std::chrono::nanoseconds::zero();
}
return result;
}
void clear()
{
decltype(mTimers) tmp;
mTimers.swap(tmp);
}
private:
class CompareTimer
{
public:
bool operator() (const Timer::Ptr& left, const Timer::Ptr& right) const
{
return (left->getEndTime() > right->getEndTime()) ||
(left->getEndTime() == right->getEndTime() && left->getSeqId() > right->getSeqId());
}
};
std::priority_queue<Timer::Ptr, std::vector<Timer::Ptr>, CompareTimer> mTimers;
size_t mSeqId = 0;
};
class ShardingTimerMgr
{
public:
ShardingTimerMgr(std::chrono::nanoseconds interval, size_t slotNum)
:
mInterval(interval),
mTimerMgrList(slotNum, std::make_shared<TimerMgr>()),
mCurrentSlot(0),
mCurrentSlotStartTime(std::chrono::steady_clock::now())
{
if (interval == std::chrono::nanoseconds::zero())
{
throw std::runtime_error("interval is zero");
}
if (slotNum == 0)
{
throw std::runtime_error("slot num is zero");
}
}
virtual ~ShardingTimerMgr() = default;
void addTimer(const Timer::Ptr& timer)
{
const auto diff = timer->getEndTime() - mCurrentSlotStartTime;
const auto index = ((diff / mInterval) + mCurrentSlot) % mTimerMgrList.size();
mTimerMgrList[index]->addTimer(timer);
}
void schedule()
{
while (true)
{
const auto expiration = mCurrentSlotStartTime + mInterval;
//expirationΪµ±Ç°slot µ÷¶ÈµÄ½ØÖ¹Ê±¼ä(´óÓÚ´Ëʱ¼äµÄtimer²»Äܱ»µ÷¶È)
//ÒòΪ¿ÉÄÜÓÉÓÚ½ø³ÌÆäËûÂß¼­»òÕßϵͳhangµÈÎÊÌâµ¼Öµ÷¶Èʱ»úÑÓ³Ù£¬µ¼Öµ±Ç°slotµÄºÜ¶àtimer¶¼ÒѾ­µ½ÆÚ
//Èç¹û²»Ê¹ÓÃexpirationÅжϣ¬ÄÇô¿ÉÄÜtimerµÄµ÷¶ÈûÓÐÑϸñ°´ÕÕÏȺó˳ÐòÖ´ÐÐ(Òò´ËÕý³£Çé¿öÏÂÓ¦¸ÃÈ¥Ö´ÐкóÐøslot)
mTimerMgrList[mCurrentSlot]->schedule(expiration);
if (std::chrono::steady_clock::now() < expiration)
{
break;
}
mCurrentSlot++;
mCurrentSlot %= mTimerMgrList.size();
mCurrentSlotStartTime += mInterval;
}
}
bool isEmpty() const
{
for (const auto& timerMgr : mTimerMgrList)
{
if (!timerMgr->isEmpty())
{
return false;
}
}
return true;
}
std::chrono::nanoseconds nearLeftTime()
{
// ÏÈÖ´ÐÐschedule£¬ÒòΪ¿ÉÄÜ´æÔÚ£¨µÈ´ýÁ˱Ƚϳ¤Ê±¼äûÓÐschedule£©ÆäËûslot´æÔÚÒѾ­µ½ÆÚ(ÇÒÏÈÓÚµ±Ç°slot)µÄ¶¨Ê±Æ÷¡£
// µ±È»£¬ÎÒÃÇÒ²¿ÉÒÔ±éÀúmTimerMgrList»ñÈ¡ÆäÖÐ×îСµÄ³¬Ê±Ê±¼ä
if (true)
{
schedule();
return mTimerMgrList[mCurrentSlot]->nearLeftTime();
}
else
{
auto nearLeftTime = std::chrono::nanoseconds::zero();
for (const auto& timerMgr : mTimerMgrList)
{
nearLeftTime = std::min<std::chrono::nanoseconds>(nearLeftTime, timerMgr->nearLeftTime());
}
return nearLeftTime;
}
}
private:
const std::chrono::nanoseconds mInterval;
const std::vector<TimerMgr::Ptr> mTimerMgrList;
size_t mCurrentSlot;
std::chrono::steady_clock::time_point mCurrentSlotStartTime;
};
int main()
{
srand(time(nullptr));
ShardingTimerMgr timerMgr(std::chrono::seconds(1), 5);
const auto nowTime = std::chrono::steady_clock::now();
std::vector< std::chrono::milliseconds> lastTimeList;
const size_t a = 1000;
const size_t b = 10;
for (size_t i = 0; i < a; i++)
{
const auto lastTime = std::chrono::seconds(rand() % 20) + std::chrono::milliseconds(rand() % 1000);
auto timer = std::make_shared<Timer>(nowTime,
lastTime,
[=, &lastTimeList]() {
lastTimeList.push_back(lastTime);
});
timerMgr.addTimer(timer);
}
for (size_t i = 0; i < b; i++)
{
const auto lastTime = std::chrono::seconds(5);
auto timer = std::make_shared<Timer>(nowTime,
lastTime,
[=, &lastTimeList] () {
lastTimeList.push_back(lastTime);
std::cout << "hello: " << i << std::endl;
});
timerMgr.addTimer(timer);
}
while (!timerMgr.isEmpty())
{
timerMgr.schedule();
}
if (lastTimeList.size() != (a + b))
{
throw std::runtime_error("count error");
}
auto minLastTime = std::chrono::milliseconds::zero();
for (const auto& v : lastTimeList)
{
if (v < minLastTime)
{
throw std::runtime_error("timeout error");
}
minLastTime = v;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment