Effective CQRS in Azure Functions with MediatR and CosmosDB

The article explores the integration of various cutting-edge technologies in modern software development. It delves into the implementation of Command Query Responsibility Segregation (CQRS) architecture, leveraging Azure Functions for serverless computing, and Azure Cosmos DB for globally distributed database management.

The .NET SDK serves as the foundational framework, facilitating seamless integration and development. Additionally, the article discusses the utilization of the Mediator pattern for simplifying communication between components.

Furthermore, it highlights the advantages of polymorphic JSON serialization for flexible data handling. By incorporating these technologies, developers can build scalable, resilient, and efficient software solutions tailored to modern application requirements.

What is CQRS?

Command Query Responsibility Segregation (CQRS) is a design pattern separating the responsibilities of reading data (queries) from those of writing data (commands) in software systems. CQRS enhances scalability, performance, and maintainability by decoupling the read and write operations. It allows the optimization of each side independently, employing different models tailored to specific requirements. Advantages include improved scalability through optimized read and write paths, better performance by tailoring data retrieval strategies, enhanced maintainability with clearer separation of concerns, and flexibility to choose appropriate data storage and retrieval mechanisms. Additionally, CQRS facilitates complex domain modeling, fostering cleaner, more maintainable codebases.

What are Azure Functions and Azure CosmosDB?

Azure Functions is a serverless compute service offered by Microsoft Azure, enabling developers to build event-driven applications and microservices without managing infrastructure. Developers can write functions in various languages like C#, JavaScript, Python, and more, triggered by events from Azure services or external sources like HTTP requests or message queues. With automatic scaling and pay-per-use pricing, Azure Functions offer cost-effective scalability and resource optimization. It supports integration with other Azure services like Azure Storage, Azure Cosmos DB, and Azure Event Hubs, facilitating seamless development of cloud-native applications. Azure Functions streamline development, allowing a focus on business logic rather than infrastructure management.

Azure Cosmos DB is a globally distributed, multi-model database service provided by Microsoft Azure. It offers high availability, low latency, and scalability, making it suitable for a wide range of applications. With support for multiple data models including document, key-value, graph, and column family, Cosmos DB provides flexibility for diverse data storage needs. Its global distribution and multi-master replication enable data access from any geographical region with consistency options tailored to application requirements. Cosmos DB offers automatic indexing, native integration with Azure services, and comprehensive SLAs for throughput, latency, and availability, making it a robust choice for modern, globally distributed, and high-performance applications.

Techstack and Architecture Design Patterns

Techstack and Packages

  • .net 8
  • MediatR
  • CosmosDb
  • Azure latest .net SDK

Design / Architecture Patterns

Repository Pattern

The repository design pattern is a structural pattern used in software development to separate data access logic from business logic. It provides a centralized interface for accessing data stored in a database or other data sources, promoting modularity, flexibility, and easier maintenance of the application codebase.

Mediator Pattern

The mediator design pattern facilitates communication between objects by encapsulating their interactions within a mediator object. Objects don’t directly communicate but instead send messages to the mediator, relaying them to other objects as needed. This promotes loose coupling and simplifies the maintenance of complex interactions between multiple objects.

Project Overview

The solution is divided in 4 separate projects.

  • Api.AzureFunctions
  • Api.CommandHandlers
  • Api.Commands
  • Api.Persistence

Each project implements a specific logical part. Further below we will go more into detail of the implementations. The azure function project will implement the final code which will be deployed to the Azure function app. The Handler and Commands project will implement the typical IMediatr classes and interfaces. The Commands project can also be seen as a “entities” project, as these objects will be persisted in the Azure Cosmos DB. The Persistence project will have the detailed implementation how to connect to Azure Cosmos DB.

The following images shows the projects dependency graph. Note the similarity to the Clean Code Architecture.

project dependency structure

Commands Project Implementation

ICommand marker interface

using MediatR;

namespace Api.Commands;

public interface ICommand : IRequest
{

}

public interface ICommand<out TResult> : IRequest<TResult>
{

}

Operationbase Class

Hot to serialize / deserialize polymorphic objects with .net system json library?

  1. Define Base Class: Create an abstract base class with properties and methods common to all derived types.
  2. Apply JsonDerivedType Attribute: Decorate the base class with the JsonDerivedType attribute, specifying the derived types along with their corresponding type identifiers.
  3. Implement Derived Classes: Create derived classes that inherit from the base class.
  4. Serialization: Use JsonSerializer to serialize objects of the base class or its derived types.
  5. Deserialization: Deserialize JSON data back into objects of the base class or its derived types.

By adhering to these steps, you can effectively utilize the JsonDerivedType attribute to handle polymorphic serialization and deserialization within the System.Text.Json library in .NET.

The base implementation of the Operation class

using System.Text.Json.Serialization;

namespace Api.Commands;

[JsonDerivedType(typeof(RotationOperation), typeDiscriminator: nameof(RotationOperation))]
// .. more implementations
public abstract class Operation {
    public Guid Id {get; set;}

   // ...
}

RotationOperation as an explicit class

An example implementation of an concrete class. Note that the ICommand interface is implemented as well. This will tell MediatR that this object can be send processed with and request handler.

namespace Api.Entities;

public class RotationOperation : Operation, ICommand {
    // specific fields
}

Persistence Project Implementation

The persistence project contains logic how to connect with the CosmosDB storage. It is implemented with a generic repository pattern approach. Only the Get method is implemented as an example.

Generic interface

namespace Api.Persistence;

public interface ICrudRepository<TEntity> {

    public Task<Result<TEntity>> GetAsync(Guid id);
}

Explicit operation interface

using Api.Entities;

namespace Api.Persistence;

public interface IOperationRepository : ICrudRepository<Operation> {


}

Generic implementation

using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Azure.Cosmos;

namespace Api.Persistence;

public class BaseCosmosDbRepository <TEntity> : ICrudRepository<TEntity> where TEntity : class
{
    private readonly Container _container;
    private readonly CosmosClient _cosmosClient;

    public BaseCosmosDbRepository(CosmosClient cosmosClient, string containerName)
    {
       _cosmosClient = cosmosClient;
        var database = _cosmosClient.GetDatabase("ipa");
        _container = database.GetContainer(containerName);
    }

    public async Task<Result<TEntity>> GetAsync(Guid id)
    {
        var result = await _container.ReadItemAsync<TEntity>(id.ToString(), PartitionKey.None);
        return Result<TEntity>.Ok(result.Resource);
    }
}

Explicit implementation

using Api.Commands;
using Microsoft.Azure.Cosmos;


namespace Api.Persistence;

public class OperationRepository : BaseCosmosDbRepository<Operation>, IOperationRepository
{
    public OperationRepository(CosmosClient cosmosClient) : base(cosmosClient, "Operation")
    {

    }
}

You will notice that the operation repository is only implemented for the abstract base type (hence the RotationOperationRepository does not exist). The goal is that at runtime the json deserializer will notice which concrete type of the operation class must be created.

CosmosDB Inheritance with .net System.Text.Json Library

In order to configure the correct json serialization we must override the current serializer. We have created a new class CosmosSystemTextJsonSerializer which takes the correct serialization options. Note also that the CosmosDB client is implemented as singleton as it is current best practice for Azure services.

Dependency injection and configuration

using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;



namespace Api.Persistence;


public static class ProgramExtensions {

    public static IServiceCollection AddAzureServices(this IServiceCollection services) 
    {
        services.AddOptions<AzureCredentialsConfiguration>().Configure<IConfiguration>((settings, configuration) => {
            configuration.GetSection("AzureCredentials").Bind(settings);
        });

        services.AddSingleton<CosmosClient>(s => {
            var config = s.GetRequiredService<IOptions<AzureCredentialsConfiguration>>();

            var jsonSerializerOptions = new JsonSerializerOptions()
            {
                DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
                PropertyNamingPolicy = JsonNamingPolicy.CamelCase
            };

            var cosmosSystemTextJsonSerializer = new CosmosSystemTextJsonSerializer(jsonSerializerOptions);

            CosmosClientOptions options = new()
            {
                Serializer = cosmosSystemTextJsonSerializer,
            };

            var client = new CosmosClient(
                accountEndpoint: config.Value.CosmosEndpoint,
                authKeyOrResourceToken: config.Value.CosmosAuthKey,
                clientOptions: options
            );

            return client;
        });

        services.AddScoped<IOperationRepository, Ipa.Api.Persistence.Azure.OperationRepository>();

        return services;
    }
}

Custom CosmosSystemTextJsonSerializer implementation

using System.Text.Json;
using Microsoft.Azure.Cosmos;

namespace Api.Persistence.Azure;

public class CosmosSystemTextJsonSerializer : CosmosSerializer
{
    private readonly JsonSerializerOptions? _serializerOptions;

    public CosmosSystemTextJsonSerializer() => _serializerOptions = null;

    public CosmosSystemTextJsonSerializer(JsonSerializerOptions serializerOptions) =>
        this._serializerOptions = serializerOptions;

    public override T FromStream<T>(Stream stream)
    {
        using (stream)
        {
            if (typeof(Stream).IsAssignableFrom(typeof(T)))
            {
                return (T)(object)stream;
            }

            return JsonSerializer.DeserializeAsync<T>(stream, _serializerOptions).GetAwaiter().GetResult();
        }
    }

    public override Stream ToStream<T>(T input)
    {
        var outputStream = new MemoryStream();

        JsonSerializer.SerializeAsync<T>(outputStream, input, _serializerOptions).GetAwaiter().GetResult();

        outputStream.Position = 0;
        return outputStream;
    }
}

CommandHandler Project Implementation

The command handler project will provide the business logic about how to handle the specific commands.

ApiCommandHandler implementation

using Api.Commands;
using Api.Persistence;
using Microsoft.Extensions.Logging;

namespace Api.CommandHandlers;

public class ApiCommandHandler
{
    private readonly IMediator_mediator;

    public ApiCommandHandler(IMediator mediator) 
    {
        _mediator = mediator;
    }

    Task Handle(Guid operationId) 
    {
        try {
            var operationResult = await _operationRepository.GetAsync(id);

            if(!operationResult .IsSuccess || operationResult .Data == null) {
                // handle not found
            }

            var operation = operationResult .Data;
            await _mediator.Send(operation!);
        } catch(Exception ex) {
            // handle error
        }
    }
}

ICommandHandler marker interface

using MediatR;

namespace Api.CommandHandlers;

public interface ICommandHandler<in TRequest, TResult> : IRequestHandler<TRequest, TResult>
  where TRequest : ICommand<TResult>
{

}

public interface ICommandHandler<in TRequest> : IRequestHandler<TRequest>
  where TRequest : ICommand
{

}

Concrete implementation of the RotationOperation Request Handler

using Api.Commands;
using Api.Persistence;
using Microsoft.Extensions.Logging;

namespace Api.CommandHandlers;

public class RotationOperationHandler : ICommandHandler<RotationOperation>
{
    public RotationOperationHandler()
    {
    }

    public Task Handle(RotationOperation request, CancellationToken cancellationToken)
    {
        // implementation
    }
}

Azure Functions Project Implementation

Azure function glue code

Code contains only logic how to be triggered via the Azure infrastructure. It directly forwards the necessary information to the command handler classes.

using System.Net;
using Azure;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Logging;


namespace Api.AzureFunction
{
    public class OperationHandler
    {
        private readonly ILogger _logger;

        private readonly ApiCommandHandler _commandHandler;

        public ipa_operation_handler(ILoggerFactory loggerFactory, ApiCommandHandler commandHandler)
        {
            _logger = loggerFactory.CreateLogger<ipa_operation_handler>();
            _commandHandler = commandHandler;
        }

        [Function("cosmosdbTrigger")]
        public async Task Run([CosmosDBTrigger(
            databaseName: "ipa",
            containerName: "Operation",
            Connection = "storageConnection",
            LeaseContainerName = "leases",
            CreateLeaseContainerIfNotExists = true)] IReadOnlyList<Ipa.Api.Entities.Operation> input)
        {
            _logger.LogInformation("Cosmos DB trigger");

            if(input == null) return;

            foreach(var item in newItems) {
                await _commandHandler.Handle(item.Id)
            }
        }
    }
}

Dependency injection and configuration

using Api.Commands;
using Api.CommandHandlers;
using Api.AzureFunctions;
using Api.Persistence;
using MediatR;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

var host = new HostBuilder()
    .ConfigureFunctionsWorkerDefaults()

    .ConfigureServices(services => {
        services.AddAzureServices();

        services.AddMediatR(cfg =>
        {
            cfg.RegisterServicesFromAssemblies(AzureFunctionAssembly.Get(), CommandsAssembly.Get(), CommandHandlersAssembly.Get());
        });
    })
    .Build();

host.Run();

Conclusion

In conclusion, adopting a cloud-agnostic approach ensures that cloud-specific code remains either abstracted or encapsulated behind interfaces, enhancing portability and reducing vendor lock-in. By employing patterns like command handlers in Azure Functions, new functionalities can be seamlessly integrated without altering the existing codebase. Domain-specific logic remains independent of cloud services, facilitating smooth transitions between different cloud providers like Azure and AWS. This strategy not only enhances flexibility but also simplifies maintenance and future scalability, enabling businesses to adapt to evolving requirements and leverage the best offerings across various cloud platforms.

Links