Skip to content

Commit

Permalink
Merge pull request #9 from haskell-works/decode-with-deconflict
Browse files Browse the repository at this point in the history
Deconflict
  • Loading branch information
AlexeyRaga authored Apr 10, 2018
2 parents 404cc41 + 9134d16 commit 41f0697
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 15 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ services:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka
image: confluentinc/cp-kafka:3.1.1
hostname: kafka
ports:
- 9092:9092
Expand All @@ -24,7 +24,7 @@ services:


schema-registry:
image: confluentinc/cp-schema-registry
image: confluentinc/cp-schema-registry:3.1.1
hostname: schema-registry
ports:
- 8081:8081
Expand Down
3 changes: 2 additions & 1 deletion hw-kafka-avro.cabal
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: hw-kafka-avro
version: 1.3.0
version: 1.4.0
synopsis: Avro support for Kafka infrastructure
description: Please see README.md
homepage: https://github.com/haskell-works/hw-kafka-avro#readme
Expand Down Expand Up @@ -41,6 +41,7 @@ library
semigroups,
servant,
servant-client,
tagged,
text,
transformers,
unordered-containers
Expand Down
78 changes: 78 additions & 0 deletions scripts/hackage-docs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/bin/bash
set -e

# This if stack-enabled fork of https://github.com/ekmett/lens/blob/master/scripts/hackage-docs.sh

if [ "$#" -ne 1 ]; then
echo "Usage: scripts/hackage-docs.sh HACKAGE_USER"
exit 1
fi

user=$1

cabal_file=$(find . -maxdepth 1 -name "*.cabal" -print -quit)
if [ ! -f "$cabal_file" ]; then
echo "Run this script in the top-level package directory"
exit 1
fi

pkg=$(awk -F ":[[:space:]]*" 'tolower($1)=="name" { print $2 }' < "$cabal_file")
ver=$(awk -F ":[[:space:]]*" 'tolower($1)=="version" { print $2 }' < "$cabal_file")

if [ -z "$pkg" ]; then
echo "Unable to determine package name"
exit 1
fi

if [ -z "$ver" ]; then
echo "Unable to determine package version"
exit 1
fi

echo "Detected package: $pkg-$ver"

dir=$(mktemp -d build-docs.XXXXXX)
trap 'rm -r "$dir"' EXIT

export PATH=$(stack path --bin-path)

ghc --version
cabal --version
stack --version

if haddock --hyperlinked-source >/dev/null
then
echo "Using fancy hyperlinked source"
HYPERLINK_FLAG="--haddock-option=--hyperlinked-source"
else
echo "Using boring hyperlinked source"
HYPERLINK_FLAG="--hyperlink-source"
fi

# Cabal dist in temporary location
builddir=$dir/dist

# Build dependencies haddocks with stack, so we get links
stack haddock --only-dependencies

# Configure using stack databases
snapshotpkgdb=$(stack path --snapshot-pkg-db)
localpkgdb=$(stack path --local-pkg-db)
cabal configure -v2 --builddir=$builddir --package-db=clear --package-db=global --package-db=$snapshotpkgdb --package-db=$localpkgdb

# Build Hadckage compatible docs
cabal haddock -v2 --builddir=$builddir $HYPERLINK_FLAG --html-location='/package/$pkg-$version/docs' --contents-location='/package/$pkg-$version'

# Copy into right directory
cp -R $builddir/doc/html/$pkg/ $dir/$pkg-$ver-docs

# Tar and gzip
tar cvz -C $dir --format=ustar -f $dir/$pkg-$ver-docs.tar.gz $pkg-$ver-docs

# Upload
curl -X PUT \
-H 'Content-Type: application/x-tar' \
-H 'Content-Encoding: gzip' \
-u "$user" \
--data-binary "@$dir/$pkg-$ver-docs.tar.gz" \
"https://hackage.haskell.org/package/$pkg-$ver/docs"
30 changes: 20 additions & 10 deletions src/Kafka/Avro/Decode.hs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
{-# LANGUAGE ScopedTypeVariables #-}
module Kafka.Avro.Decode
(
DecodeError(..)
, decodeWithSchema, extractSchemaId
) where

import Control.Arrow (left)
import Control.Monad.IO.Class (MonadIO)
import Data.Avro as A (FromAvro, Result (..))
import Data.Avro as A (FromAvro, HasAvroSchema (..), Result (..), fromAvro)
import qualified Data.Avro as A (decodeWithSchema)
import qualified Data.Avro.Decode as A (decodeAvro)
import qualified Data.Avro.Deconflict as A (deconflict)
import Data.Avro.Schema (Schema)
import Data.Bits (shiftL)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL hiding (zipWith)
import Data.Int
import Data.Tagged (Tagged, untag)
import Kafka.Avro.SchemaRegistry

data DecodeError = DecodeRegistryError SchemaRegistryError
Expand All @@ -31,11 +36,18 @@ decodeWithSchema sr bs =
case schemaData of
Left err -> return $ Left err
Right (sid, payload) -> do
res <- leftMap DecodeRegistryError <$> loadSchema sr sid
return $ res >>= decode payload
res <- left DecodeRegistryError <$> loadSchema sr sid
return $ res >>= flip decodeWithDeconflict payload
where
schemaData = maybe (Left BadPayloadNoSchemaId) Right (extractSchemaId bs)
decode p s = resultToEither s (A.decodeWithSchema s p)

decodeWithDeconflict :: forall a. (FromAvro a) => Schema -> ByteString -> Either DecodeError a
decodeWithDeconflict writerSchema bs =
let readerSchema = untag (schema :: Tagged a Schema)
in left (DecodeError readerSchema) $ do
raw <- A.decodeAvro writerSchema bs
val <- A.deconflict writerSchema readerSchema raw
resultToEither readerSchema (fromAvro val)

extractSchemaId :: ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId bs = do
Expand All @@ -48,11 +60,9 @@ extractSchemaId bs = do
let int = sum $ zipWith shiftL ints [0, 8, 16, 24]
return (SchemaId int, b4)

leftMap :: (e -> e') -> Either e r -> Either e' r
leftMap _ (Right r) = Right r
leftMap f (Left e) = Left (f e)

resultToEither :: Schema -> A.Result a -> Either DecodeError a
resultToEither :: Schema -> A.Result a -> Either String a
resultToEither sc res = case res of
Success a -> Right a
Error msg -> Left $ DecodeError sc msg
Error msg -> Left msg


4 changes: 2 additions & 2 deletions stack.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
resolver: lts-9.9
resolver: lts-10.8

packages:
- '.'

# Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3)
extra-deps:
- avro-0.2.0.0
- avro-0.2.1.1
- pure-zlib-0.6

# Override default flag values for local packages and extra-deps
Expand Down

0 comments on commit 41f0697

Please sign in to comment.