DatatypeChannels.ASB implements Datatype Channel pattern on top of official Azure Service Bus clients.
A channel is an abstraction on top of Service Bus entities and it's implemented using topics, subscriptions and queues.
The core idea is that while there are many channels, all carrying messages, every channel is dedicated to a single type of message.
It works under assumptions that:
- the bus toplogy is shared among heterogenous players, it's not a private implementation detail
- we can capture the message type via the topic + subscription rules
- the consumer is long-lived and handles only one type of message
- the consumer decides when to pull the next message of a subscription or a queue
- the publishers can be long- or short- lived
- we have multiple serialization formats and may want to add new ones easily, this may introduce a new channel
- we never want to loose a message, a message channel gets but is unable to read should be deadlettered
- we control the receiving side of the bus topology and have 'Manage' permissions to create subscriptions and queues as necessary
The DatatypeChannels.ASB library can be installed from NuGet:
dotnet add package DatatypeChannels.ASB
This example demonstrates the complete roundtrip over the channel using DatatypeChannels.ASB API:
open DatatypeChannels.ASB
// create the entry point - DatatypeChannels
let channels = Channels.fromFqdn "mynamespace.servicebus.windows.net"
(Azure.Identity.DefaultAzureCredential false)
(Log ignore) // no logging
// define the "source" - subscription and routing rules, together called "binding"
let src = Subscription { Subscription = CreateSubscriptionOptions("mytopic", "mysub", MaxDeliveryCount = 1), Rule = None }
// create a consumer, specifying the convertion function from bus primitives
let consumer = channels.GetConsumer PlainText.ofReceived src // see the tutorial for details
// create a publisher, specifying the target topic and the conversion function to bus primitives
let publisher = channels.GetPublisher PlainText.toSend (Topic "mytopic") // see the tutorial for details
Threading.Thread.Sleep 5_000 // majic number - this is how long it takes the topic to start routing messages to new subscriptions
task {
do! publisher |> Publisher.publish "test-payload"
let! received = TimeSpan.FromSeconds 3. |> consumer.Get
printfn "Received: %A" received
do! consumer.Ack received.Value.Id
}
Note that the API is task-based and the bindings are defined using Azure.Messaging.ServiceBus.Administration
types.
Constructing a consumer establishes the connection and sets up or updates the subscription to the rule specified.
Other types of consumer sources are DeadLetter
, Queue
and Temporary
.
The project is hosted on GitHub where you can report issues, fork
the project and submit pull requests.
The library is available under MIT license, which allows modification and
redistribution for both commercial and non-commercial purposes. For more information see the
License file in the GitHub repository.
namespace System
Multiple items
namespace FSharp
--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
namespace FSharp.Control.Tasks
module Builders
from FSharp.Control.Tasks
namespace Azure
namespace Azure.Messaging
namespace Azure.Messaging.ServiceBus
namespace Azure.Messaging.ServiceBus.Administration
namespace DatatypeChannels
namespace DatatypeChannels.ASB
val channels : Channels
Multiple items
module Channels
from DatatypeChannels.ASB
--------------------
type Channels =
inherit IDisposable
abstract member GetConsumer : OfReceived<'msg> -> Source -> Consumer<'msg>
abstract member GetPublisher : ToSend<'msg> -> Topic -> Publisher<'msg>
abstract member UsingPublisher : ToSend<'msg> -> Topic -> (Publisher<'msg> -> Task<unit>) -> Task<unit>
val fromFqdn : fqNamespace:string -> credential:Azure.Core.TokenCredential -> log:Log -> Channels
namespace Azure.Identity
Multiple items
type DefaultAzureCredential =
inherit TokenCredential
new : unit -> unit + 3 overloads
member GetToken : requestContext: TokenRequestContext *?cancellationToken: CancellationToken -> AccessToken
member GetTokenAsync : requestContext: TokenRequestContext *?cancellationToken: CancellationToken -> ValueTask<AccessToken>
member GetTokenImplAsync : async: bool * requestContext: TokenRequestContext * cancellationToken: CancellationToken -> ValueTask<AccessToken>
static member GetDefaultAzureCredentialChain : factory: DefaultAzureCredentialFactory * options: DefaultAzureCredentialOptions -> TokenCredential []
static member GetTokenFromCredentialAsync : credential: TokenCredential * requestContext: TokenRequestContext * async: bool * cancellationToken: CancellationToken -> ValueTask<AccessToken>
static member GetTokenFromSourcesAsync : sources: TokenCredential [] * requestContext: TokenRequestContext * async: bool * cancellationToken: CancellationToken -> ValueTask<struct (AccessToken * TokenCredential)>
static val DefaultExceptionMessage : string
static val UnhandledExceptionMessage : string
...
--------------------
Azure.Identity.DefaultAzureCredential(?includeInteractiveCredentials: bool) : Azure.Identity.DefaultAzureCredential
Azure.Identity.DefaultAzureCredential(options: Azure.Identity.DefaultAzureCredentialOptions) : Azure.Identity.DefaultAzureCredential
Multiple items
union case Log.Log: (string * obj [] -> unit) -> Log
--------------------
[<Struct>]
type Log = | Log of (string * obj [] -> unit)
val ignore : value:'T -> unit
val src : Source
union case Source.Subscription: Binding -> Source
Multiple items
type CreateSubscriptionOptions =
interface IEquatable<CreateSubscriptionOptions>
new : topicName: string * subscriptionName: string -> unit + 1 overload
member Equals : obj: obj -> bool + 1 overload
member GetHashCode : unit -> int
static member op_Equality : left: CreateSubscriptionOptions * right: CreateSubscriptionOptions -> bool
static member op_Inequality : left: CreateSubscriptionOptions * right: CreateSubscriptionOptions -> bool
val ``<DeadLetteringOnMessageExpiration>k__BackingField`` : bool
val ``<EnableBatchedOperations>k__BackingField`` : bool
val ``<EnableDeadLetteringOnFilterEvaluationExceptions>k__BackingField`` : bool
val ``<RequiresSession>k__BackingField`` : bool
...
--------------------
CreateSubscriptionOptions(subscription: SubscriptionProperties) : CreateSubscriptionOptions
CreateSubscriptionOptions(topicName: string, subscriptionName: string) : CreateSubscriptionOptions
union case Option.None: Option<'T>
val consumer : Consumer<obj>
abstract member Channels.GetConsumer : OfReceived<'msg> -> Source -> Consumer<'msg>
val publisher : Publisher<string>
abstract member Channels.GetPublisher : ToSend<'msg> -> Topic -> Publisher<'msg>
Multiple items
union case Topic.Topic: string -> Topic
--------------------
module Topic
from DatatypeChannels.ASB
--------------------
[<Struct>]
type Topic = | Topic of string
namespace System.Threading
Multiple items
type Thread =
inherit CriticalFinalizerObject
new : start: ParameterizedThreadStart -> unit + 3 overloads
member Abort : unit -> unit + 1 overload
member DisableComObjectEagerCleanup : unit -> unit
member Finalize : unit -> unit
member GetApartmentState : unit -> ApartmentState
member GetCompressedStack : unit -> CompressedStack
member GetHashCode : unit -> int
member Interrupt : unit -> unit
member Join : unit -> unit + 2 overloads
...
--------------------
Threading.Thread(start: Threading.ParameterizedThreadStart) : Threading.Thread
Threading.Thread(start: Threading.ThreadStart) : Threading.Thread
Threading.Thread(start: Threading.ParameterizedThreadStart, maxStackSize: int) : Threading.Thread
Threading.Thread(start: Threading.ThreadStart, maxStackSize: int) : Threading.Thread
Threading.Thread.Sleep(timeout: TimeSpan) : unit
Threading.Thread.Sleep(millisecondsTimeout: int) : unit
val task : TaskBuilder
Multiple items
union case Publisher.Publisher: ('msg -> Threading.Tasks.Task<unit>) -> Publisher<'msg>
--------------------
module Publisher
from DatatypeChannels.ASB
--------------------
[<Struct>]
type Publisher<'msg> = | Publisher of ('msg -> Task<unit>)
val publish : msg:'msg -> Publisher<'msg> -> Threading.Tasks.Task<unit>
val received : Envelope<obj> option
Multiple items
[<Struct>]
type TimeSpan =
new : hours: int * minutes: int * seconds: int -> unit + 3 overloads
member Add : ts: TimeSpan -> TimeSpan
member CompareTo : value: obj -> int + 1 overload
member Divide : divisor: float -> TimeSpan + 1 overload
member Duration : unit -> TimeSpan
member Equals : value: obj -> bool + 2 overloads
member GetHashCode : unit -> int
member Multiply : factor: float -> TimeSpan
member Negate : unit -> TimeSpan
member Subtract : ts: TimeSpan -> TimeSpan
...
--------------------
TimeSpan ()
TimeSpan(ticks: int64) : TimeSpan
TimeSpan(hours: int, minutes: int, seconds: int) : TimeSpan
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : TimeSpan
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : TimeSpan
TimeSpan.FromSeconds(value: float) : TimeSpan
abstract member Consumer.Get : TimeSpan -> Threading.Tasks.Task<Envelope<'msg> option>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
abstract member Consumer.Ack : string -> Threading.Tasks.Task<unit>
property Option.Value: Envelope<obj> with get
Envelope.Id: string