Skip to content

Commit

Permalink
Better requeue handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Jleagle committed Jul 2, 2018
1 parent ba41048 commit 51b5c8f
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 55 deletions.
75 changes: 51 additions & 24 deletions Consumers/AbstractConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,40 @@ public abstract class AbstractConsumer
{
// Consts
public const string queueApps = "Apps";
public const string queueAppsData = "Apps_Data";
protected const string queueAppsData = "Apps_Data";

public const string queuePackages = "Packages";
public const string queuePackagesData = "Packages_Data";
protected const string queuePackagesData = "Packages_Data";

public const string queueProfiles = "Profiles";
public const string queueProfilesData = "Profiles_Data";
protected const string queueProfilesData = "Profiles_Data";

public const string queueChangesData = "Changes_Data";

private const string queueAppend = "Steam_";

// Queue -> Consumer
public static readonly Dictionary<string, AbstractConsumer> consumers = new Dictionary<string, AbstractConsumer>
{
{queueApps, new AppConsumer()},
{queuePackages, new PackageConsumer()},
{queueProfiles, new ProfileConsumer()}
};

//
private static readonly ConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = Environment.GetEnvironmentVariable("STEAM_RABBIT_HOST"),
UserName = Environment.GetEnvironmentVariable("STEAM_RABBIT_USER"),
Password = Environment.GetEnvironmentVariable("STEAM_RABBIT_PASS")
};

// Abstracts
protected abstract Task<bool> HandleMessage(BasicDeliverEventArgs msg);
// Abstract
protected abstract Task<Tuple<bool, bool>> HandleMessage(BasicDeliverEventArgs msg);

// Statics
public static void startConsumers()
{
var consumers = new Dictionary<string, AbstractConsumer>
{
{queueApps, new AppConsumer()},
{queuePackages, new PackageConsumer()},
{queueProfiles, new ProfileConsumer()}
};

foreach (var entry in consumers)
{
var thread = new Thread(() =>
Expand All @@ -55,24 +60,24 @@ public static void startConsumers()
}
}

private static (IConnection, IModel) getConnection()
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();

return (connection, channel);
}

public static void Produce(string queue, string data)
{
if (data.Length == 0)
{
return;
}

var x = getConnection();
var connection = x.Item1;
var channel = x.Item2;

channel.QueueDeclare(queueAppend + queue, true, false, false);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

var bytes = Encoding.UTF8.GetBytes(data);
channel.BasicPublish("", queueAppend + queue, null, bytes);
channel.BasicPublish("", queueAppend + queue, properties, bytes);

channel.Close();
connection.Close();
Expand All @@ -97,18 +102,40 @@ private void Consume(string queue)
var consumer = new EventingBasicConsumer(channel);
consumer.Received += delegate(object chan, BasicDeliverEventArgs ea)
{
var success = HandleMessage(ea);
if (success.Result)
// Check logged in to Steam
if (!Steam.steamClient.IsConnected || !Steam.isLoggedOn)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
Log.GoogleInfo("Waiting to login before consuming");
channel.BasicNack(ea.DeliveryTag, false, true);
return;
}

// Consume message
var response = HandleMessage(ea);
var ack = response.Result.Item1;
var requeue = response.Result.Item2;

if (ack)
{
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
channel.BasicNack(ea.DeliveryTag, false, true);
Thread.Sleep(TimeSpan.FromSeconds(1));
channel.BasicNack(ea.DeliveryTag, false, requeue);
}
};

channel.BasicConsume(queue, false, consumer);
}

private static (IConnection, IModel) getConnection()
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();

return (connection, channel);
}
}
}
35 changes: 26 additions & 9 deletions Consumers/AppConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,38 @@
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using System.Collections.Generic;
using System.Linq;
using Newtonsoft.Json;
using SteamUpdater.Consumers.Messages;

namespace SteamUpdater.Consumers
{
public class AppConsumer : AbstractConsumer
{
protected override async Task<bool> HandleMessage(BasicDeliverEventArgs msg)
protected override async Task<Tuple<bool, bool>> HandleMessage(BasicDeliverEventArgs msg)
{
var msgBody = Encoding.UTF8.GetString(msg.Body);
var ids = msgBody.Split(",");
var IDs = msgBody.Split(",");

if (ids.Length == 0)
if (msgBody.Length == 0 || IDs.Length == 0)
{
return true;
return new Tuple<bool, bool>(false, false);
}

var appIDs = Array.ConvertAll(ids, Convert.ToUInt32);
// Requeue anything over 100
if (IDs.Length > 100)
{
Produce(queueApps, string.Join(",", IDs.Skip(100).ToArray()));
IDs = IDs.Take(100).ToArray();
}

var appIDs = Array.ConvertAll(IDs, Convert.ToUInt32);
var JobID = Steam.steamApps.PICSGetProductInfo(appIDs, new List<uint>(), false);
var callback = await JobID;

if (!callback.Complete)
{
return false;
return new Tuple<bool, bool>(false, true);
}

foreach (var result in callback.Results)
Expand All @@ -40,19 +48,28 @@ protected override async Task<bool> HandleMessage(BasicDeliverEventArgs msg)

Produce(queueAppsData, JsonConvert.SerializeObject(message));
}

// Log unknowns
foreach (var entry in result.UnknownApps)
{
Log.GoogleInfo("Unknown app: " + entry);
}

foreach (var entry in result.UnknownPackages)
{
Log.GoogleInfo("Unknown package: " + entry);
}
}

return true;
return new Tuple<bool, bool>(true, false);
}

protected void GetGlobalAchievementPercentagesForApp()
{

}

protected void GetSchemaForGame()
{

}
}
}
6 changes: 6 additions & 0 deletions Consumers/Messages/AbstractMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SteamUpdater.Consumers.Messages
{
public abstract class AbstractMessage
{
}
}
4 changes: 1 addition & 3 deletions Consumers/Messages/AppDataMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

namespace SteamUpdater.Consumers.Messages
{
public class AppDataMessage
public class AppDataMessage : AbstractMessage
{
public SteamApps.PICSProductInfoCallback.PICSProductInfo PICSAppInfo { get; set; }
public string GlobalAchievementPercentages { get; set; }
public string GetSchemaForGame { get; set; }
}
}
2 changes: 1 addition & 1 deletion Consumers/Messages/ChangeDataMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace SteamUpdater.Consumers.Messages
{
public class ChangeDataMessage
public class ChangeDataMessage : AbstractMessage
{
public SteamApps.PICSChangesCallback PICSChanges { get; set; }
}
Expand Down
2 changes: 1 addition & 1 deletion Consumers/Messages/PackageDataMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace SteamUpdater.Consumers.Messages
{
public class PackageDataMessage
public class PackageDataMessage : AbstractMessage
{
public PICSProductInfoCallback.PICSProductInfo PICSPackageInfo { get; set; }
}
Expand Down
2 changes: 1 addition & 1 deletion Consumers/Messages/ProfileDataMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace SteamUpdater.Consumers.Messages
{
public class ProfileDataMessage
public class ProfileDataMessage : AbstractMessage
{
public SteamFriends.ProfileInfoCallback ProfileInfo { get; set; }
}
Expand Down
27 changes: 20 additions & 7 deletions Consumers/PackageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,25 @@ namespace SteamUpdater.Consumers
{
public class PackageConsumer : AbstractConsumer
{
protected override async Task<bool> HandleMessage(BasicDeliverEventArgs msg)
protected override async Task<Tuple<bool, bool>> HandleMessage(BasicDeliverEventArgs msg)
{
var msgBody = Encoding.UTF8.GetString(msg.Body);
var ids = msgBody.Split(",");
var IDs = msgBody.Split(",");

if (ids.Length == 0)
if (msgBody.Length == 0 || IDs.Length == 0)
{
return true;
return new Tuple<bool, bool>(false, false);
}

var packageIDs = Array.ConvertAll(ids, Convert.ToUInt32);
// todo, take 100, requeue rest

var packageIDs = Array.ConvertAll(IDs, Convert.ToUInt32);
var JobID = Steam.steamApps.PICSGetProductInfo(new List<uint>(), packageIDs, false);
var callback = await JobID;

if (!callback.Complete)
{
return false;
return new Tuple<bool, bool>(false, true);
}

foreach (var result in callback.Results)
Expand All @@ -40,9 +42,20 @@ protected override async Task<bool> HandleMessage(BasicDeliverEventArgs msg)

Produce(queuePackagesData, JsonConvert.SerializeObject(message));
}

// Log unknowns
foreach (var entry in result.UnknownApps)
{
Log.GoogleInfo("Unknown app: " + entry);
}

foreach (var entry in result.UnknownPackages)
{
Log.GoogleInfo("Unknown package: " + entry);
}
}

return true;
return new Tuple<bool, bool>(true, false);
}
}
}
9 changes: 5 additions & 4 deletions Consumers/ProfileConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text;
using System;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using RabbitMQ.Client.Events;
Expand All @@ -9,13 +10,13 @@ namespace SteamUpdater.Consumers
{
public class ProfileConsumer : AbstractConsumer
{
protected override async Task<bool> HandleMessage(BasicDeliverEventArgs msg)
protected override async Task<Tuple<bool, bool>> HandleMessage(BasicDeliverEventArgs msg)
{
var msgBody = Encoding.UTF8.GetString(msg.Body);

if (msgBody.Length == 0)
{
return true;
return new Tuple<bool, bool>(false, false);
}

var id = new SteamID();
Expand All @@ -30,7 +31,7 @@ protected override async Task<bool> HandleMessage(BasicDeliverEventArgs msg)

Produce(queueProfilesData, JsonConvert.SerializeObject(message));

return true;
return new Tuple<bool, bool>(true, false);
}
}
}
8 changes: 3 additions & 5 deletions Steam.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ public static class Steam

public static bool quitOnDisconnect;
private static uint previousChangeNumber;
private static bool isLoggedOn;
public static bool isLoggedOn;

private static SteamClient steamClient;
public static SteamClient steamClient;
private static CallbackManager manager;

private static System.Timers.Timer timer1;
Expand Down Expand Up @@ -68,8 +68,6 @@ private static void RunWaitCallbacks(object obj, EventArgs args)

private static async void CheckForChanges(object obj, EventArgs args)
{
Console.Write(".");

timer2.Stop();

// Check logged in
Expand Down Expand Up @@ -151,7 +149,7 @@ private static void OnDisconnected(SteamClient.DisconnectedCallback callback)
}

// Try to reconnect
Thread.Sleep(TimeSpan.FromSeconds(15));
Thread.Sleep(TimeSpan.FromSeconds(10));
steamClient.Connect();
}

Expand Down

0 comments on commit 51b5c8f

Please sign in to comment.