From 884163fcf94f1ca7799da56de86b90321396301b Mon Sep 17 00:00:00 2001 From: SherwoodCH Date: Thu, 16 Apr 2015 23:58:37 +0200 Subject: [PATCH] introduce offset handling --- src/HMB/Internal/Handler.hs | 14 +--------- src/HMB/Internal/Log/Writer.hs | 49 +++++++++++++++++++++++++--------- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/src/HMB/Internal/Handler.hs b/src/HMB/Internal/Handler.hs index 3c72158..bac7a92 100644 --- a/src/HMB/Internal/Handler.hs +++ b/src/HMB/Internal/Handler.hs @@ -86,15 +86,9 @@ sendProduceResponse socket responsemessage = do -- FetchRequest ----------------- -requestLog :: Request -> IO [Log] -requestLog req = mapM readLog [ - (BS.unpack(topicName x), fromIntegral(rqFtPartitionNumber y)) - | x <- rqFtTopics req, y <- partitions x - ] - packPartitionsToFtRsPayload :: TopicName -> Partition -> IO RsFtPayload packPartitionsToFtRsPayload t p = do - log <- readLog (BS.unpack $ t, fromIntegral $ rqFtPartitionNumber p) + log <- readLog (BS.unpack $ t, fromIntegral $ rqFtPartitionNumber p, fromIntegral $ rqFtFetchOffset p) return $ RsFtPayload 0 0 @@ -102,9 +96,6 @@ packPartitionsToFtRsPayload t p = do (fromIntegral $ BL.length $ foldl (\acc ms -> BL.append acc (buildMessageSet ms)) BL.empty log) log ---packLogToFtRsPayload :: Log -> [RsFtPayload] ---packLogToFtRsPayload log = packMsToFtRsPayload log - packLogToFtRs :: Topic -> IO Response packLogToFtRs t = do rss <- (mapM (packPartitionsToFtRsPayload $ topicName t) $ partitions t) @@ -116,11 +107,8 @@ packLogToFtRs t = do handleFetchRequest :: Request -> Socket -> IO() handleFetchRequest req sock = do - logs <- requestLog req rs <- mapM packLogToFtRs (rqFtTopics req) let rsms = ResponseMessage 0 1 rs - print rsms let msg = buildFtRsMessage rsms SBL.sendAll sock msg - print $ "send resp:" ++ show msg diff --git a/src/HMB/Internal/Log/Writer.hs b/src/HMB/Internal/Log/Writer.hs index 9bdb7c5..3cc34bf 100644 --- a/src/HMB/Internal/Log/Writer.hs +++ b/src/HMB/Internal/Log/Writer.hs @@ -6,7 +6,7 @@ import qualified Data.ByteString.Lazy as BL import Data.Binary.Put import System.Directory import Control.Conditional - +import Control.Monad import Kafka.Protocol import Data.Binary.Get @@ -25,22 +25,45 @@ logFile o = (show $ fromIntegral o) ++ ".log" getPath :: String -> String -> String getPath folder file = folder ++ "/" ++ file -buildLog :: Log -> BL.ByteString -buildLog [] = BL.empty -buildLog (x:xs) = - BL.append (buildMessageSet x) (buildLog xs) +buildLog :: Offset -> Log -> BL.ByteString +buildLog o [] = BL.empty +buildLog o (x:xs) = + (BL.append (buildLogEntry x o) (buildLog (o + 1) xs)) writeLog :: MessageInput -> IO() writeLog (topicName, partitionNumber, log) = do createDirectoryIfMissing False $ logFolder topicName partitionNumber let filePath = getPath (logFolder topicName partitionNumber) (logFile 0) ifM (doesFileExist filePath) - (BL.appendFile filePath $ buildLog log) - (BL.writeFile filePath $ buildLog log) + (appendToLog filePath (topicName,partitionNumber, log)) + (newLog filePath (topicName,partitionNumber, log)) +appendToLog :: String -> MessageInput -> IO() +appendToLog filepath (t, p, log) = do + o <- getMaxOffsetOfLog (t, p, log) + print o --TODO: is needed for preventing file lock ... + let l = buildLog (o + 1) log + BL.appendFile filepath l + return () + +newLog :: String -> MessageInput -> IO() +newLog filepath (t, p, log) = do + let l = buildLog 0 log + BL.writeFile filepath l + return () + +maxOffset :: [Offset] -> Offset +maxOffset [x] = x +maxOffset (x:xs) = max x (maxOffset xs) + +getMaxOffsetOfLog :: MessageInput -> IO Offset +getMaxOffsetOfLog (t, p, _) = do + log <- readLogFromBeginning (t,p) --TODO: optimieren, dass nich gesamter log gelesen werden muss + return (maxOffset $ [ offset x | x <- log ]) --- todo: move to reader + +-- todo: move to reader getLog :: Get Log getLog = do empty <- isEmpty @@ -55,9 +78,11 @@ parseLog a = do input <- BL.readFile a return (runGet getLog input) - -readLog :: (String, Int) -> IO Log -readLog (t, p) = parseLog $ +readLogFromBeginning :: (String, Int) -> IO Log +readLogFromBeginning (t, p) = parseLog $ getPath (logFolder t p) (logFile 0) - +readLog :: (String, Int, Int) -> IO Log +readLog (t, p, o) = do + log <- readLogFromBeginning (t,p) + return ([ x | x <- log, fromIntegral(offset x) >= o])