All Apache Thrift tutorials require that you have:
thrift -r --gen netstd tutorial.thrift
using Microsoft.Extensions.Logging;
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;
using Thrift.Transport.Client;
using tutorial;
#pragma warning disable IDE0057 // substr
namespace Client
{
public static class LoggingHelper
{
public static ILoggerFactory LogFactory { get; } = LoggerFactory.Create(builder => {
ConfigureLogging(builder);
});
public static void ConfigureLogging(ILoggingBuilder logging)
{
logging.SetMinimumLevel(LogLevel.Trace);
logging.AddConsole();
logging.AddDebug();
}
public static ILogger<T> CreateLogger<T>() => LogFactory.CreateLogger<T>();
}
public class Program
{
private static readonly ILogger Logger = LoggingHelper.CreateLogger<Program>();
private static readonly TConfiguration Configuration = new();
private static void DisplayHelp()
{
Logger.LogInformation(@"
Usage:
Client -help
will diplay help information
Client -tr:<transport> -bf:<buffering> -pr:<protocol> [-mc:<numClients>] [-multiplex]
will run client with specified arguments (tcp transport and binary protocol by default) and with 1 client
Options:
-tr (transport):
tcp - (default) tcp transport (localhost:9090)
tcptls - tcp tls transport (localhost:9090)
namedpipe - namedpipe transport (pipe "".test"")
http - http transport (http://localhost:9090)
-bf (buffering):
none - (default) no buffering
buffered - buffered transport
framed - framed transport
-pr (protocol):
binary - (default) binary protocol
compact - compact protocol
json - json protocol
-multiplex - adds multiplexed protocol
-mc (multiple clients):
<numClients> - number of multiple clients to connect to server (max 100, default 1)
Sample:
Client -tr:tcp -pr:binary
");
}
public static async Task Main(string[] args)
{
args ??= [];
// -help is rather unusual but we leave it for compatibility
if (args.Any(x => x.Equals("-help") || x.Equals("--help") || x.Equals("-h") || x.Equals("-?")))
{
DisplayHelp();
return;
}
Logger.LogInformation("Starting client...");
using var source = new CancellationTokenSource();
await RunAsync(args, source.Token);
}
private static async Task RunAsync(string[] args, CancellationToken cancellationToken)
{
var numClients = GetNumberOfClients(args);
Logger.LogInformation("Selected # of clients: {numClients}", numClients);
var transport = GetTransport(args);
Logger.LogInformation("Selected client transport: {transport}", transport);
var protocol = MakeProtocol( args, MakeTransport(args));
Logger.LogInformation("Selected client protocol: {GetProtocol(args)}", GetProtocol(args));
var mplex = GetMultiplex(args);
Logger.LogInformation("Multiplex {mplex}", mplex);
var tasks = new Task[numClients];
for (int i = 0; i < numClients; i++)
{
var task = RunClientAsync(protocol, mplex, cancellationToken);
tasks[i] = task;
}
Task.WaitAll(tasks,cancellationToken);
await Task.CompletedTask;
}
private static bool GetMultiplex(string[] args)
{
var mplex = args.FirstOrDefault(x => x.StartsWith("-multiplex"));
return !string.IsNullOrEmpty(mplex);
}
private static Protocol GetProtocol(string[] args)
{
var protocol = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':').Skip(1).Take(1).FirstOrDefault();
if (string.IsNullOrEmpty(protocol))
return Protocol.Binary;
protocol = protocol.Substring(0, 1).ToUpperInvariant() + protocol.Substring(1).ToLowerInvariant();
if (Enum.TryParse(protocol, true, out Protocol selectedProtocol))
return selectedProtocol;
else
return Protocol.Binary;
}
private static Buffering GetBuffering(string[] args)
{
var buffering = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(':').Skip(1).Take(1).FirstOrDefault();
if (string.IsNullOrEmpty(buffering))
return Buffering.None;
buffering = buffering.Substring(0, 1).ToUpperInvariant() + buffering.Substring(1).ToLowerInvariant();
if (Enum.TryParse<Buffering>(buffering, out var selectedBuffering))
return selectedBuffering;
else
return Buffering.None;
}
private static Transport GetTransport(string[] args)
{
var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':').Skip(1).Take(1).FirstOrDefault();
if (string.IsNullOrEmpty(transport))
return Transport.Tcp;
transport = transport.Substring(0, 1).ToUpperInvariant() + transport.Substring(1).ToLowerInvariant();
if (Enum.TryParse(transport, true, out Transport selectedTransport))
return selectedTransport;
else
return Transport.Tcp;
}
private static TTransport MakeTransport(string[] args)
{
// construct endpoint transport
TTransport? transport = null;
Transport selectedTransport = GetTransport(args);
{
switch (selectedTransport)
{
case Transport.Tcp:
transport = new TSocketTransport(IPAddress.Loopback, 9090, Configuration);
break;
case Transport.NamedPipe:
transport = new TNamedPipeTransport(".test", Configuration);
break;
case Transport.Http:
transport = new THttpTransport(new Uri("http://localhost:9090"), Configuration);
break;
case Transport.TcpTls:
transport = new TTlsSocketTransport(IPAddress.Loopback, 9090, Configuration,
GetCertificate(), CertValidator, LocalCertificateSelectionCallback);
break;
default:
Debug.Assert(false, "unhandled case");
break;
}
}
// optionally add layered transport(s)
Buffering selectedBuffering = GetBuffering(args);
switch (selectedBuffering)
{
case Buffering.Buffered:
transport = new TBufferedTransport(transport);
break;
case Buffering.Framed:
transport = new TFramedTransport(transport);
break;
default: // layered transport(s) are optional
Debug.Assert(selectedBuffering == Buffering.None, "unhandled case");
break;
}
return transport;
}
private static int GetNumberOfClients(string[] args)
{
var numClients = args.FirstOrDefault(x => x.StartsWith("-mc"))?.Split(':').Skip(1).Take(1).FirstOrDefault();
Logger.LogInformation("Selected # of clients: {numClients}", numClients);
if (int.TryParse(numClients, out int c) && (0 < c) && (c <= 100))
return c;
else
return 1;
}
private static X509Certificate2 GetCertificate()
{
// due to files location in net core better to take certs from top folder
var dir = Directory.GetParent(Directory.GetCurrentDirectory());
if (dir != null)
{
var certFile = GetCertPath(dir);
//return new X509Certificate2(certFile, "ThriftTest");
return X509CertificateLoader.LoadPkcs12FromFile(certFile, "ThriftTest");
}
else
{
Logger.LogError("Root path of {path} not found", Directory.GetCurrentDirectory());
throw new Exception($"Root path of {Directory.GetCurrentDirectory()} not found");
}
}
private static string GetCertPath(DirectoryInfo? di, int maxCount = 6)
{
var topDir = di;
var certFile = topDir?.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories).FirstOrDefault();
if (certFile == null)
{
if (maxCount == 0)
throw new FileNotFoundException("Cannot find file in directories");
return GetCertPath(di?.Parent, --maxCount);
}
return certFile.FullName;
}
private static X509Certificate2 LocalCertificateSelectionCallback(object sender,
string targetHost, X509CertificateCollection localCertificates,
X509Certificate? remoteCertificate, string[] acceptableIssuers)
{
return GetCertificate();
}
private static bool CertValidator(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors)
{
return true;
}
private static TProtocol MakeProtocol(string[] args, TTransport transport)
{
Protocol selectedProtocol = GetProtocol(args);
return selectedProtocol switch
{
Protocol.Binary => new TBinaryProtocol(transport),
Protocol.Compact => new TCompactProtocol(transport),
Protocol.Json => new TJsonProtocol(transport),
_ => throw new Exception("unhandled protocol"),
};
}
private static async Task RunClientAsync(TProtocol protocol, bool multiplex, CancellationToken cancellationToken)
{
try
{
try
{
if( multiplex)
protocol = new TMultiplexedProtocol(protocol, nameof(Calculator));
var client = new Calculator.Client(protocol);
await ExecuteCalculatorClientOperations(client, cancellationToken);
}
catch (Exception ex)
{
Logger.LogError("{ex}",ex);
}
finally
{
protocol.Transport.Close();
}
}
catch (TApplicationException x)
{
Logger.LogError("{x}",x);
}
}
private static async Task ExecuteCalculatorClientOperations( Calculator.Client client, CancellationToken cancellationToken)
{
await client.OpenTransportAsync(cancellationToken);
// Async version
Logger.LogInformation("{client.ClientId} Ping()", client.ClientId);
await client.ping(cancellationToken);
Logger.LogInformation("{client.ClientId} Add(1,1)", client.ClientId);
var sum = await client.add(1, 1, cancellationToken);
Logger.LogInformation("{client.ClientId} Add(1,1)={sum}", client.ClientId, sum);
var work = new Work
{
Op = Operation.DIVIDE,
Num1 = 1,
Num2 = 0
};
try
{
Logger.LogInformation("{client.ClientId} Calculate(1)", client.ClientId);
await client.calculate(1, work, cancellationToken);
Logger.LogInformation("{client.ClientId} Whoa we can divide by 0", client.ClientId);
}
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using shared;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using Thrift;
using Thrift.Processor;
using Thrift.Protocol;
using Thrift.Server;
using Thrift.Transport;
using Thrift.Transport.Server;
using tutorial;
#pragma warning disable IDE0057 // substr
namespace Server
{
public static class LoggingHelper
{
public static ILoggerFactory LogFactory { get; } = LoggerFactory.Create(builder => {
ConfigureLogging(builder);
});
public static void ConfigureLogging(ILoggingBuilder logging)
{
logging.SetMinimumLevel(LogLevel.Trace);
logging.AddConsole();
logging.AddDebug();
}
public static ILogger<T> CreateLogger<T>() => LogFactory.CreateLogger<T>();
}
public class Program
{
private static readonly ILogger Logger = LoggingHelper.CreateLogger<Program>();
private static readonly TConfiguration Configuration = new();
public static async Task Main(string[] args)
{
args ??= [];
// -help is rather unusual but we leave it for compatibility
if (args.Any(x => x.Equals("-help") || x.Equals("--help") || x.Equals("-h") || x.Equals("-?")))
{
DisplayHelp();
return;
}
using var source = new CancellationTokenSource();
await RunAsync(args, source.Token);
Logger.LogInformation("Press any key to stop...");
Console.ReadLine();
source.Cancel();
Logger.LogInformation("Server stopped");
}
private static void DisplayHelp()
{
Logger.LogInformation(@"
Usage:
Server -help
will diplay help information
Server -tr:<transport> -bf:<buffering> -pr:<protocol> [-multiplex]
will run server with specified arguments (tcp transport, no buffering, and binary protocol by default)
Options:
-tr (transport):
tcp - (default) tcp transport (localhost:9090)
tcptls - tcp transport with tls (localhost:9090)
namedpipe - namedpipe transport (pipe "".test"")
http - http transport (localhost:9090)
-bf (buffering):
none - (default) no buffering
buffered - buffered transport
framed - framed transport
-pr (protocol):
binary - (default) binary protocol
compact - compact protocol
json - json protocol
-multiplex - adds multiplexed protocol
Sample:
Server -tr:tcp
");
}
private static async Task RunAsync(string[] args, CancellationToken cancellationToken)
{
var selectedTransport = GetTransport(args);
var selectedBuffering = GetBuffering(args);
var selectedProtocol = GetProtocol(args);
var multiplex = GetMultiplex(args);
if (selectedTransport == Transport.Http)
{
if (multiplex)
throw new Exception("This tutorial sample code does not yet allow multiplex over http (although Thrift itself of course does)");
new HttpServerSample().Run(cancellationToken);
}
else
{
await RunSelectedConfigurationAsync(selectedTransport, selectedBuffering, selectedProtocol, multiplex, cancellationToken);
}
}
private static bool GetMultiplex(string[] args)
{
var mplex = args.FirstOrDefault(x => x.StartsWith("-multiplex"));
return !string.IsNullOrEmpty(mplex);
}
private static Protocol GetProtocol(string[] args)
{
var protocol = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':').Skip(1).Take(1).FirstOrDefault();
if (string.IsNullOrEmpty(protocol))
return Protocol.Binary;
protocol = protocol.Substring(0, 1).ToUpperInvariant() + protocol.Substring(1).ToLowerInvariant();
if (Enum.TryParse(protocol, true, out Protocol selectedProtocol))
return selectedProtocol;
else
return Protocol.Binary;
}
private static Buffering GetBuffering(string[] args)
{
var buffering = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(':').Skip(1).Take(1).FirstOrDefault();
if (string.IsNullOrEmpty(buffering))
return Buffering.None;
buffering = buffering.Substring(0, 1).ToUpperInvariant() + buffering.Substring(1).ToLowerInvariant();
if( Enum.TryParse<Buffering>(buffering, out var selectedBuffering))
return selectedBuffering;
else
return Buffering.None;
}
private static Transport GetTransport(string[] args)
{
var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':').Skip(1).Take(1).FirstOrDefault();
if (string.IsNullOrEmpty(transport))
return Transport.Tcp;
transport = transport.Substring(0, 1).ToUpperInvariant() + transport.Substring(1).ToLowerInvariant();
if( Enum.TryParse(transport, true, out Transport selectedTransport))
return selectedTransport;
else
return Transport.Tcp;
}
private static async Task RunSelectedConfigurationAsync(Transport transport, Buffering buffering, Protocol protocol, bool multiplex, CancellationToken cancellationToken)
{
TServerTransport serverTransport = transport switch
{
Transport.Tcp => new TServerSocketTransport(9090, Configuration),
Transport.NamedPipe => new TNamedPipeServerTransport(".test", Configuration, NamedPipeServerFlags.None, 64),
Transport.TcpTls => new TTlsServerSocketTransport(9090, Configuration, GetCertificate(), ClientCertValidator, LocalCertificateSelectionCallback),
_ => throw new ArgumentException("unsupported value $transport", nameof(transport)),
};
TTransportFactory? transportFactory = buffering switch
{
Buffering.Buffered => new TBufferedTransport.Factory(),
Buffering.Framed => new TFramedTransport.Factory(),
// layered transport(s) are optional
Buffering.None => null,
_ => throw new ArgumentException("unsupported value $buffering", nameof(buffering)),
};
TProtocolFactory protocolFactory = protocol switch
{
Protocol.Binary => new TBinaryProtocol.Factory(),
Protocol.Compact => new TCompactProtocol.Factory(),
Protocol.Json => new TJsonProtocol.Factory(),
_ => throw new ArgumentException("unsupported value $protocol", nameof(protocol)),
};
var handler = new CalculatorAsyncHandler();
ITAsyncProcessor processor = new Calculator.AsyncProcessor(handler);
if (multiplex)
{
var multiplexedProcessor = new TMultiplexedProcessor();
multiplexedProcessor.RegisterProcessor(nameof(Calculator), processor);
processor = multiplexedProcessor;
}
try
{
Logger.LogInformation(
"TSimpleAsyncServer with \n{transport} transport\n{buffering} buffering\nmultiplex = {multiplex}\n{protocol} protocol",
transport,
buffering,
multiplex ? "yes" : "no",
protocol
);
var server = new TSimpleAsyncServer(
itProcessorFactory: new TSingletonProcessorFactory(processor),
serverTransport: serverTransport,
inputTransportFactory: transportFactory,
outputTransportFactory: transportFactory,
inputProtocolFactory: protocolFactory,
outputProtocolFactory: protocolFactory,
logger: LoggingHelper.CreateLogger<TSimpleAsyncServer >());
Logger.LogInformation("Starting the server...");
await server.ServeAsync(cancellationToken);
}
catch (Exception x)
{
Logger.LogInformation("{x}",x);
}
}
private static X509Certificate2 GetCertificate()
{
// due to files location in net core better to take certs from top folder
var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory()));
//return new X509Certificate2(certFile, "ThriftTest");
return X509CertificateLoader.LoadPkcs12FromFile(certFile, "ThriftTest");
}
private static string GetCertPath(DirectoryInfo? di, int maxCount = 6)
{
var topDir = di;
var certFile = topDir?.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories).FirstOrDefault();
if (certFile == null)
{
if (maxCount == 0)
throw new FileNotFoundException("Cannot find file in directories");
return GetCertPath(di?.Parent, --maxCount);
}
return certFile.FullName;
}
private static X509Certificate2 LocalCertificateSelectionCallback(object sender,
string targetHost, X509CertificateCollection localCertificates,
X509Certificate? remoteCertificate, string[] acceptableIssuers)
{
return GetCertificate();
}
private static bool ClientCertValidator(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslPolicyErrors)
{
return true;
}
private enum Transport
{
Tcp,
NamedPipe,
Http,
TcpTls,
}
private enum Buffering
{
None,
Buffered,
Framed,
}
private enum Protocol
{
Binary,
Compact,
Json,
}
public class HttpServerSample
{
public void Run(CancellationToken cancellationToken)
{
var config = new ConfigurationBuilder()
.AddEnvironmentVariables(prefix: "ASPNETCORE_")
.Build();
var host = new WebHostBuilder()
.UseConfiguration(config)
.UseKestrel()
.UseUrls("http://localhost:9090")
.UseContentRoot(Directory.GetCurrentDirectory())
.UseStartup<Startup>()
.ConfigureLogging((ctx,logging) => LoggingHelper.ConfigureLogging(logging))
.Build();
Logger.LogTrace("test");
Logger.LogCritical("test");
host.RunAsync(cancellationToken).GetAwaiter().GetResult();
}
public class Startup
{
public Startup(IWebHostEnvironment env)
{
var builder = new ConfigurationBuilder()
.SetBasePath(env.ContentRootPath)
.AddEnvironmentVariables();
Configuration = builder.Build();
}
public IConfigurationRoot Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
// NOTE: this is not really the recommended way to do it
// because the HTTP server cannot be configured properly to e.g. accept framed or multiplex
services.AddTransient<Calculator.IAsync, CalculatorAsyncHandler>();
services.AddTransient<ITAsyncProcessor, Calculator.AsyncProcessor>();
services.AddTransient<THttpServerTransport, THttpServerTransport>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerFactory loggerFactory)
{
_ = env;
_ = loggerFactory;
app.UseMiddleware<THttpServerTransport>();
}
}
}
public class CalculatorAsyncHandler : Calculator.IAsync
{
private readonly Dictionary<int, SharedStruct> _log = [];
public CalculatorAsyncHandler()
{
}
public async Task<SharedStruct> getStruct(int key,
CancellationToken cancellationToken)
{
Logger.LogInformation("GetStruct({key})", key);
return await Task.FromResult(_log[key]);
}
public async Task ping(CancellationToken cancellationToken)
{
Logger.LogInformation("Ping()");
await Task.CompletedTask;
}
public async Task<int> add(int num1, int num2, CancellationToken cancellationToken)
{
Logger.LogInformation("Add({num1},{num2})", num1, num2);
return await Task.FromResult(num1 + num2);
}
public async Task<int> calculate(int logid, Work? w, CancellationToken cancellationToken)
{
Logger.LogInformation("Calculate({logid}, [{w.Op},{w.Num1},{w.Num2}])", logid, w?.Op, w?.Num1, w?.Num2);
int val;
switch (w?.Op)
{
case Operation.ADD:
val = w.Num1 + w.Num2;
break;
case Operation.SUBTRACT:
val = w.Num1 - w.Num2;
break;
case Operation.MULTIPLY:
val = w.Num1 * w.Num2;
break;
case Operation.DIVIDE:
if (w.Num2 == 0)
{
var io = new InvalidOperation
{
WhatOp = (int) w.Op,
Why = "Cannot divide by 0"
};
throw io;
}
val = w.Num1 / w.Num2;
break;
default:
{
var io = new InvalidOperation
{
WhatOp = ((int?)w?.Op) ?? -1,
Why = "Unknown operation"