DatatypeChannels.ASB is organized into 3 APIs:
Channels
API - connects to Azure Service Bus and provides constructors for publishers and consumers
ToSend
/OfReceived
types for packing and unpacking messages in and out of Azure Service Bus primitives
Publisher
and Consumer
types - the primary means of interaction
Channels
is the entry point into the API, it can be constructed with a connection string or FQDN of the namespace, for example:
open DatatypeChannels.ASB
use channels = Channels.fromFqdn "mynamespace.servicebus.windows.net"
(Azure.Identity.DefaultAzureCredential false)
(Log ignore) // no logging
For other construction parameters please see Channels.mkNew
.
Internally two connections might be established - one for consumer bindings management and one for data activities.
The connections are established just in time for the first use and are cached for the lifetime of Channels
factory.
Publisher
and Consumer
implement the opposite ends of a Datatype Channel, meaning that the type of message is determinated staticaly and for entire lifetime of the channel.
Implement OfRecieved
and ToSend
functions to convert from/to service bus primitives.
Here's an example of a plain text converters:
module PlainText =
open Azure.Messaging.ServiceBus
let ofReceived =
fun (sbm: ServiceBusReceivedMessage) -> sbm.Body.ToString()
|> OfReceived
let toSend =
fun (msg:string) -> ServiceBusMessage(Body = BinaryData msg)
|> ToSend
Any exception during convertion from the bus primitives will result in message being Nacked and sent to the Dead Letter queue (if configured).
Publisher is just a function that takes a message and returns when completed and
there are two ways to obtain it:
let publisher = channels.GetPublisher PlainText.toSend (Topic "topic") // long-living publisher
- Or for immediate use and disposal of the publisher:
"test message" |-> channels.UsingPublisher PlainText.toSend (Topic "topic") // one-off publishing, using the infix operator
Consumer implicitly manages the susbcription every time it's created, updating forwarding and rules as needed.
A persistent queue can be setup using one or more subscriptions, each one will forward to the queue automatically.
Note: a known issue prevents idle detection on subscriptions that have auto-forwarding defined.
Temporary
queue can be useful for short-lived consumers and is setup with auto-acknowedgments and a guid for a name, it will be deleted once consumer is disposed of.
Implementing guaranteed processing a Persistent
queue or a Subscription
with explicit Ack/Nack of the messages is recommended.
open Azure.Messaging.ServiceBus.Administration
// define the a Subscription binding with no filters
let src = Subscription { Subscription = CreateSubscriptionOptions("mytopic", "mysub", MaxDeliveryCount = 1), Rule = None }
// create a consumer, specifying the convertion function from bus primitives
let consumer = channels.GetConsumer PlainText.ofReceived src
// another example of a source - deadletter queue of the subscription defined above
// this entity must exist, DatatypeChannels.ASB does not manage deadletter sources
let dlq = DeadLetter "mytopic/Subscriptions/mysub"
let dlqConsumer = channels.GetConsumer PlainText.ofReceived dlq
// define the a Persistent queue binding
let consumer = Persistent (CreateQueueOptions("myqueue", LockDuration = TimeSpan.FromMinutes 6.),
[{ Subscription = CreateSubscriptionOptions("mytopic", "mysub", MaxDeliveryCount = 1), Rule = None }])
// define a Temporary queue binding, the messages will be auto-ack'ed
let tmp = Temporary [{ Subscription = CreateSubscriptionOptions("mytopic", "mysub", MaxDeliveryCount = 1), Rule = None }]
Once created, we can start polling them, specifying the timeout:
open FSharp.Control.Tasks.Builders
task {
let! received = TimeSpan.FromSeconds 3. |> consumer.Get
// for this example - negative acknowledge the message so that it's routed to the deadletters
do! consumer.Nack received.Value.Id
let! received = TimeSpan.FromSeconds 3. |> dlqConsumer.Get // now we can process it from the deadletters
}
Note that:
- when using prefetch: w/ or not you called
Get
the prefetch itself counts as a delivery attempt
- if expected message processing, specified as
LockDuration
, exceeds the maximum lock time, DatatypeChannels.ASB will override the duration with the valida maximum and setup background lock renewal task.
namespace System
namespace DatatypeChannels
namespace DatatypeChannels.ASB
val channels : Channels
Multiple items
module Channels
from DatatypeChannels.ASB
--------------------
type Channels =
inherit IDisposable
abstract member GetConsumer : OfReceived<'msg> -> Source -> Consumer<'msg>
abstract member GetPublisher : ToSend<'msg> -> Topic -> Publisher<'msg>
abstract member UsingPublisher : ToSend<'msg> -> Topic -> (Publisher<'msg> -> Task<unit>) -> Task<unit>
val fromFqdn : fqNamespace:string -> credential:Azure.Core.TokenCredential -> log:Log -> Channels
namespace Azure
namespace Azure.Identity
Multiple items
type DefaultAzureCredential =
inherit TokenCredential
new : unit -> unit + 3 overloads
member GetToken : requestContext: TokenRequestContext *?cancellationToken: CancellationToken -> AccessToken
member GetTokenAsync : requestContext: TokenRequestContext *?cancellationToken: CancellationToken -> ValueTask<AccessToken>
member GetTokenImplAsync : async: bool * requestContext: TokenRequestContext * cancellationToken: CancellationToken -> ValueTask<AccessToken>
static member GetDefaultAzureCredentialChain : factory: DefaultAzureCredentialFactory * options: DefaultAzureCredentialOptions -> TokenCredential []
static member GetTokenFromCredentialAsync : credential: TokenCredential * requestContext: TokenRequestContext * async: bool * cancellationToken: CancellationToken -> ValueTask<AccessToken>
static member GetTokenFromSourcesAsync : sources: TokenCredential [] * requestContext: TokenRequestContext * async: bool * cancellationToken: CancellationToken -> ValueTask<struct (AccessToken * TokenCredential)>
static val DefaultExceptionMessage : string
static val UnhandledExceptionMessage : string
...
--------------------
Azure.Identity.DefaultAzureCredential(?includeInteractiveCredentials: bool) : Azure.Identity.DefaultAzureCredential
Azure.Identity.DefaultAzureCredential(options: Azure.Identity.DefaultAzureCredentialOptions) : Azure.Identity.DefaultAzureCredential
Multiple items
union case Log.Log: (string * obj [] -> unit) -> Log
--------------------
[<Struct>]
type Log = | Log of (string * obj [] -> unit)
val ignore : value:'T -> unit
namespace Azure.Messaging
namespace Azure.Messaging.ServiceBus
val ofReceived : OfReceived<obj>
val sbm : ServiceBusReceivedMessage
Multiple items
union case OfReceived.OfReceived: (ServiceBusReceivedMessage -> 'msg) -> OfReceived<'msg>
--------------------
type OfReceived<'msg> = | OfReceived of (ServiceBusReceivedMessage -> 'msg)
val toSend : ToSend<string>
val msg : string
Multiple items
val string : value:'T -> string
--------------------
type string = String
Multiple items
--------------------
Multiple items
union case ToSend.ToSend: ('msg -> ServiceBusMessage) -> ToSend<'msg>
--------------------
type ToSend<'msg> = | ToSend of ('msg -> ServiceBusMessage)
val publisher : Publisher<string>
abstract member Channels.GetPublisher : ToSend<'msg> -> Topic -> Publisher<'msg>
module PlainText
from Tutorial
Multiple items
union case Topic.Topic: string -> Topic
--------------------
module Topic
from DatatypeChannels.ASB
--------------------
[<Struct>]
type Topic = | Topic of string
abstract member Channels.UsingPublisher : ToSend<'msg> -> Topic -> (Publisher<'msg> -> Threading.Tasks.Task<unit>) -> Threading.Tasks.Task<unit>
namespace Azure.Messaging.ServiceBus.Administration
val src : Source
union case Source.Subscription: Binding -> Source
Multiple items
type CreateSubscriptionOptions =
interface IEquatable<CreateSubscriptionOptions>
new : topicName: string * subscriptionName: string -> unit + 1 overload
member Equals : obj: obj -> bool + 1 overload
member GetHashCode : unit -> int
static member op_Equality : left: CreateSubscriptionOptions * right: CreateSubscriptionOptions -> bool
static member op_Inequality : left: CreateSubscriptionOptions * right: CreateSubscriptionOptions -> bool
val ``<DeadLetteringOnMessageExpiration>k__BackingField`` : bool
val ``<EnableBatchedOperations>k__BackingField`` : bool
val ``<EnableDeadLetteringOnFilterEvaluationExceptions>k__BackingField`` : bool
val ``<RequiresSession>k__BackingField`` : bool
...
--------------------
CreateSubscriptionOptions(subscription: SubscriptionProperties) : CreateSubscriptionOptions
CreateSubscriptionOptions(topicName: string, subscriptionName: string) : CreateSubscriptionOptions
union case Option.None: Option<'T>
val consumer : Consumer<obj>
abstract member Channels.GetConsumer : OfReceived<'msg> -> Source -> Consumer<'msg>
val dlq : Source
union case Source.DeadLetter: string -> Source
val dlqConsumer : Consumer<obj>
val consumer : Source
union case Source.Persistent: CreateQueueOptions * Binding list -> Source
Multiple items
type CreateQueueOptions =
interface IEquatable<CreateQueueOptions>
new : name: string -> unit + 1 overload
member Equals : obj: obj -> bool + 1 overload
member GetHashCode : unit -> int
static member op_Equality : left: CreateQueueOptions * right: CreateQueueOptions -> bool
static member op_Inequality : left: CreateQueueOptions * right: CreateQueueOptions -> bool
val ``<AuthorizationRules>k__BackingField`` : AuthorizationRules
val ``<DeadLetteringOnMessageExpiration>k__BackingField`` : bool
val ``<EnableBatchedOperations>k__BackingField`` : bool
val ``<EnablePartitioning>k__BackingField`` : bool
...
--------------------
CreateQueueOptions(name: string) : CreateQueueOptions
CreateQueueOptions(queue: QueueProperties) : CreateQueueOptions
Multiple items
[<Struct>]
type TimeSpan =
new : hours: int * minutes: int * seconds: int -> unit + 3 overloads
member Add : ts: TimeSpan -> TimeSpan
member CompareTo : value: obj -> int + 1 overload
member Divide : divisor: float -> TimeSpan + 1 overload
member Duration : unit -> TimeSpan
member Equals : value: obj -> bool + 2 overloads
member GetHashCode : unit -> int
member Multiply : factor: float -> TimeSpan
member Negate : unit -> TimeSpan
member Subtract : ts: TimeSpan -> TimeSpan
...
--------------------
TimeSpan ()
TimeSpan(ticks: int64) : TimeSpan
TimeSpan(hours: int, minutes: int, seconds: int) : TimeSpan
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : TimeSpan
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : TimeSpan
TimeSpan.FromMinutes(value: float) : TimeSpan
val tmp : Source
union case Source.Temporary: Binding list -> Source
Multiple items
namespace FSharp
--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control
--------------------
namespace Microsoft.FSharp.Control
namespace FSharp.Control.Tasks
module Builders
from FSharp.Control.Tasks
val task : TaskBuilder
val received : obj
TimeSpan.FromSeconds(value: float) : TimeSpan
val received : Envelope<obj> option
abstract member Consumer.Get : TimeSpan -> Threading.Tasks.Task<Envelope<'msg> option>