{-# LANGUAGE ScopedTypeVariables #-}
module Control.RateLimit (
generateRateLimitedFunction
, RateLimit(..)
, ResultsCombiner
, dontCombine
, rateLimitInvocation
, rateLimitExecution
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad (void)
import Data.Functor (($>))
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Time.Units
data RateLimit a
= PerInvocation a
| PerExecution a
type ResultsCombiner req resp = req -> req -> Maybe (req, resp -> (resp, resp))
dontCombine :: ResultsCombiner a b
dontCombine :: forall a b. ResultsCombiner a b
dontCombine a
_ a
_ = Maybe (a, b -> (b, b))
forall a. Maybe a
Nothing
rateLimitInvocation :: TimeUnit t
=> t
-> (req -> IO resp)
-> IO (req -> IO resp)
rateLimitInvocation :: forall t req resp.
TimeUnit t =>
t -> (req -> IO resp) -> IO (req -> IO resp)
rateLimitInvocation t
pertime req -> IO resp
action =
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
forall req resp t.
TimeUnit t =>
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction (t -> RateLimit t
forall a. a -> RateLimit a
PerInvocation t
pertime) req -> IO resp
action ResultsCombiner req resp
forall a b. ResultsCombiner a b
dontCombine
rateLimitExecution :: TimeUnit t
=> t
-> (req -> IO resp)
-> IO (req -> IO resp)
rateLimitExecution :: forall t req resp.
TimeUnit t =>
t -> (req -> IO resp) -> IO (req -> IO resp)
rateLimitExecution t
pertime req -> IO resp
action =
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
forall req resp t.
TimeUnit t =>
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction (t -> RateLimit t
forall a. a -> RateLimit a
PerExecution t
pertime) req -> IO resp
action ResultsCombiner req resp
forall a b. ResultsCombiner a b
dontCombine
generateRateLimitedFunction :: forall req resp t
. TimeUnit t
=> RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction :: forall req resp t.
TimeUnit t =>
RateLimit t
-> (req -> IO resp)
-> ResultsCombiner req resp
-> IO (req -> IO resp)
generateRateLimitedFunction RateLimit t
ratelimit req -> IO resp
action ResultsCombiner req resp
combiner = do
chan <- STM (TChan (req, MVar resp)) -> IO (TChan (req, MVar resp))
forall a. STM a -> IO a
atomically STM (TChan (req, MVar resp))
forall a. STM (TChan a)
newTChan
void $ forkIO $ runner Nothing 0 chan
return $ resultFunction chan
where
currentMicroseconds :: IO Integer
currentMicroseconds :: IO Integer
currentMicroseconds =
Picosecond -> Integer
forall a. TimeUnit a => a -> Integer
toMicroseconds (Picosecond -> Integer)
-> (POSIXTime -> Picosecond) -> POSIXTime -> Integer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Picosecond
forall a b. (Integral a, Num b) => a -> b
fromIntegral :: Int -> Picosecond) (Int -> Picosecond)
-> (POSIXTime -> Int) -> POSIXTime -> Picosecond
forall b c a. (b -> c) -> (a -> b) -> a -> c
. POSIXTime -> Int
forall a. Enum a => a -> Int
fromEnum (POSIXTime -> Integer) -> IO POSIXTime -> IO Integer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
IO POSIXTime
getPOSIXTime
runner :: Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner :: forall a.
Maybe Integer -> Integer -> TChan (req, MVar resp) -> IO a
runner Maybe Integer
mLastRun Integer
lastAllowance TChan (req, MVar resp)
chan = do
(req, respMV) <- STM (req, MVar resp) -> IO (req, MVar resp)
forall a. STM a -> IO a
atomically (STM (req, MVar resp) -> IO (req, MVar resp))
-> STM (req, MVar resp) -> IO (req, MVar resp)
forall a b. (a -> b) -> a -> b
$ TChan (req, MVar resp) -> STM (req, MVar resp)
forall a. TChan a -> STM a
readTChan TChan (req, MVar resp)
chan
let baseHandler resp
resp = MVar resp -> resp -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar resp
respMV resp
resp
beforeWait <- currentMicroseconds
let targetPeriod = t -> Integer
forall a. TimeUnit a => a -> Integer
toMicroseconds (t -> Integer) -> t -> Integer
forall a b. (a -> b) -> a -> b
$ RateLimit t -> t
getRate RateLimit t
ratelimit
timeSinceLastRun = case Maybe Integer
mLastRun of
Just Integer
lastRun -> Integer
beforeWait Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
lastRun
Maybe Integer
Nothing -> Integer -> Integer
forall a. Num a => a -> a
negate Integer
targetPeriod
targetDelay = Integer
targetPeriod Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
timeSinceLastRun Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
lastAllowance
nextAllowance <- if targetDelay < 0
then pure $ abs targetDelay
else do
threadDelay $ fromIntegral targetDelay
afterWait <- currentMicroseconds
let slept = Integer
afterWait Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
beforeWait
overslept = Integer
slept Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
targetDelay
return overslept
(req', finalHandler) <- updateRequestWithFollowers chan req baseHandler
let run = req -> IO resp
action req
req' IO resp -> (resp -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= resp -> IO ()
finalHandler
beforeRun <- currentMicroseconds
if shouldFork ratelimit
then void $ forkIO run
else run
runner (Just beforeRun) nextAllowance chan
updateRequestWithFollowers :: TChan (req, MVar resp)
-> req
-> (resp -> IO ())
-> IO (req, (resp -> IO ()))
updateRequestWithFollowers :: TChan (req, MVar resp)
-> req -> (resp -> IO ()) -> IO (req, resp -> IO ())
updateRequestWithFollowers TChan (req, MVar resp)
chan req
req resp -> IO ()
handler = do
isEmpty <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TChan (req, MVar resp) -> STM Bool
forall a. TChan a -> STM Bool
isEmptyTChan TChan (req, MVar resp)
chan
if isEmpty
then return (req, handler)
else do mCombinedAndMV <- atomically $ do
tup@(next, nextRespMV) <- readTChan chan
case combiner req next of
Maybe (req, resp -> (resp, resp))
Nothing -> TChan (req, MVar resp) -> (req, MVar resp) -> STM ()
forall a. TChan a -> a -> STM ()
unGetTChan TChan (req, MVar resp)
chan (req, MVar resp)
tup STM ()
-> Maybe ((req, resp -> (resp, resp)), MVar resp)
-> STM (Maybe ((req, resp -> (resp, resp)), MVar resp))
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe ((req, resp -> (resp, resp)), MVar resp)
forall a. Maybe a
Nothing
Just (req, resp -> (resp, resp))
combined -> Maybe ((req, resp -> (resp, resp)), MVar resp)
-> STM (Maybe ((req, resp -> (resp, resp)), MVar resp))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((req, resp -> (resp, resp)), MVar resp)
-> STM (Maybe ((req, resp -> (resp, resp)), MVar resp)))
-> Maybe ((req, resp -> (resp, resp)), MVar resp)
-> STM (Maybe ((req, resp -> (resp, resp)), MVar resp))
forall a b. (a -> b) -> a -> b
$ ((req, resp -> (resp, resp)), MVar resp)
-> Maybe ((req, resp -> (resp, resp)), MVar resp)
forall a. a -> Maybe a
Just ((req, resp -> (resp, resp))
combined, MVar resp
nextRespMV)
case mCombinedAndMV of
Maybe ((req, resp -> (resp, resp)), MVar resp)
Nothing ->
(req, resp -> IO ()) -> IO (req, resp -> IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (req
req, resp -> IO ()
handler)
Just ((req
req', resp -> (resp, resp)
splitResponse), MVar resp
nextRespMV) ->
TChan (req, MVar resp)
-> req -> (resp -> IO ()) -> IO (req, resp -> IO ())
updateRequestWithFollowers TChan (req, MVar resp)
chan req
req' ((resp -> IO ()) -> IO (req, resp -> IO ()))
-> (resp -> IO ()) -> IO (req, resp -> IO ())
forall a b. (a -> b) -> a -> b
$ \resp
resp -> do
let (resp
theirs, resp
mine) = resp -> (resp, resp)
splitResponse resp
resp
MVar resp -> resp -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar resp
nextRespMV resp
mine
resp -> IO ()
handler resp
theirs
shouldFork :: RateLimit t -> Bool
shouldFork :: RateLimit t -> Bool
shouldFork (PerInvocation t
_) = Bool
True
shouldFork (PerExecution t
_) = Bool
False
getRate :: RateLimit t -> t
getRate :: RateLimit t -> t
getRate (PerInvocation t
x) = t
x
getRate (PerExecution t
x) = t
x
resultFunction :: TChan (req, MVar resp) -> req -> IO resp
resultFunction :: TChan (req, MVar resp) -> req -> IO resp
resultFunction TChan (req, MVar resp)
chan req
req = do
respMV <- IO (MVar resp)
forall a. IO (MVar a)
newEmptyMVar
atomically $ writeTChan chan (req, respMV)
takeMVar respMV