DatatypeChannels.ASB


Namespaces

DatatypeChannels.ASB is organized into 3 APIs:

Channels API

Channels is the entry point into the API, it can be constructed with a connection string or FQDN of the namespace, for example:

open DatatypeChannels.ASB

use channels = Channels.fromFqdn "mynamespace.servicebus.windows.net"
                                 (Azure.Identity.DefaultAzureCredential false)
                                 (Log ignore) // no logging

For other construction parameters please see Channels.mkNew. Internally two connections might be established - one for consumer bindings management and one for data activities. The connections are established just in time for the first use and are cached for the lifetime of Channels factory.

Converting messages between application and bus representations

Publisher and Consumer implement the opposite ends of a Datatype Channel, meaning that the type of message is determinated staticaly and for entire lifetime of the channel. Implement OfRecieved and ToSend functions to convert from/to service bus primitives.

Here's an example of a plain text converters:

module PlainText =
    open Azure.Messaging.ServiceBus
    let ofReceived  =
        fun (sbm: ServiceBusReceivedMessage) -> sbm.Body.ToString()
        |> OfReceived

    let toSend =
        fun (msg:string) -> ServiceBusMessage(Body = BinaryData msg)
        |> ToSend

Any exception during convertion from the bus primitives will result in message being Nacked and sent to the Dead Letter queue (if configured).

Publisher

Publisher is just a function that takes a message and returns when completed and there are two ways to obtain it:

let publisher = channels.GetPublisher PlainText.toSend (Topic "topic") // long-living publisher
"test message" |-> channels.UsingPublisher PlainText.toSend (Topic "topic") // one-off publishing, using the infix operator

Consumer

Consumer implicitly manages the susbcription every time it's created, updating forwarding and rules as needed. A persistent queue can be setup using one or more subscriptions, each one will forward to the queue automatically.

Note: a known issue prevents idle detection on subscriptions that have auto-forwarding defined.

Temporary queue can be useful for short-lived consumers and is setup with auto-acknowedgments and a guid for a name, it will be deleted once consumer is disposed of. Implementing guaranteed processing a Persistent queue or a Subscription with explicit Ack/Nack of the messages is recommended.

open Azure.Messaging.ServiceBus.Administration

// define the a Subscription binding with no filters
let src = Subscription { Subscription = CreateSubscriptionOptions("mytopic", "mysub", MaxDeliveryCount = 1), Rule = None }

// create a consumer, specifying the convertion function from bus primitives
let consumer = channels.GetConsumer PlainText.ofReceived src

// another example of a source - deadletter queue of the subscription defined above
// this entity must exist, DatatypeChannels.ASB does not manage deadletter sources
let dlq = DeadLetter "mytopic/Subscriptions/mysub" 
let dlqConsumer = channels.GetConsumer PlainText.ofReceived dlq

// define the a Persistent queue binding
let consumer = Persistent (CreateQueueOptions("myqueue", LockDuration = TimeSpan.FromMinutes 6.),
                           [{ Subscription = CreateSubscriptionOptions("mytopic", "mysub", MaxDeliveryCount = 1), Rule = None }])

// define a Temporary queue binding, the messages will be auto-ack'ed
let tmp = Temporary [{ Subscription = CreateSubscriptionOptions("mytopic", "mysub", MaxDeliveryCount = 1), Rule = None }]

Once created, we can start polling them, specifying the timeout:

open FSharp.Control.Tasks.Builders

task {
    let! received = TimeSpan.FromSeconds 3. |> consumer.Get
    // for this example - negative acknowledge the message so that it's routed to the deadletters
    do! consumer.Nack received.Value.Id
    let! received = TimeSpan.FromSeconds 3. |> dlqConsumer.Get // now we can process it from the deadletters
}

Note that:

namespace System
namespace DatatypeChannels
namespace DatatypeChannels.ASB
val channels : Channels
Multiple items
module Channels

from DatatypeChannels.ASB

--------------------
type Channels =
  inherit IDisposable
  abstract member GetConsumer : OfReceived<'msg> -> Source -> Consumer<'msg>
  abstract member GetPublisher : ToSend<'msg> -> Topic -> Publisher<'msg>
  abstract member UsingPublisher : ToSend<'msg> -> Topic -> (Publisher<'msg> -> Task<unit>) -> Task<unit>
val fromFqdn : fqNamespace:string -> credential:Azure.Core.TokenCredential -> log:Log -> Channels
namespace Azure
namespace Azure.Identity
Multiple items
type DefaultAzureCredential =
  inherit TokenCredential
  new : unit -> unit + 3 overloads
  member GetToken : requestContext: TokenRequestContext *?cancellationToken: CancellationToken -> AccessToken
  member GetTokenAsync : requestContext: TokenRequestContext *?cancellationToken: CancellationToken -> ValueTask<AccessToken>
  member GetTokenImplAsync : async: bool * requestContext: TokenRequestContext * cancellationToken: CancellationToken -> ValueTask<AccessToken>
  static member GetDefaultAzureCredentialChain : factory: DefaultAzureCredentialFactory * options: DefaultAzureCredentialOptions -> TokenCredential []
  static member GetTokenFromCredentialAsync : credential: TokenCredential * requestContext: TokenRequestContext * async: bool * cancellationToken: CancellationToken -> ValueTask<AccessToken>
  static member GetTokenFromSourcesAsync : sources: TokenCredential [] * requestContext: TokenRequestContext * async: bool * cancellationToken: CancellationToken -> ValueTask<struct (AccessToken * TokenCredential)>
  static val DefaultExceptionMessage : string
  static val UnhandledExceptionMessage : string
  ...

--------------------
Azure.Identity.DefaultAzureCredential(?includeInteractiveCredentials: bool) : Azure.Identity.DefaultAzureCredential
Azure.Identity.DefaultAzureCredential(options: Azure.Identity.DefaultAzureCredentialOptions) : Azure.Identity.DefaultAzureCredential
Multiple items
union case Log.Log: (string * obj [] -> unit) -> Log

--------------------
[<Struct>]
type Log = | Log of (string * obj [] -> unit)
val ignore : value:'T -> unit
namespace Azure.Messaging
namespace Azure.Messaging.ServiceBus
val ofReceived : OfReceived<obj>
val sbm : ServiceBusReceivedMessage
Multiple items
union case OfReceived.OfReceived: (ServiceBusReceivedMessage -> 'msg) -> OfReceived<'msg>

--------------------
type OfReceived<'msg> = | OfReceived of (ServiceBusReceivedMessage -> 'msg)
val toSend : ToSend<string>
val msg : string
Multiple items
val string : value:'T -> string

--------------------
type string = String
Multiple items

--------------------
Multiple items
union case ToSend.ToSend: ('msg -> ServiceBusMessage) -> ToSend<'msg>

--------------------
type ToSend<'msg> = | ToSend of ('msg -> ServiceBusMessage)
val publisher : Publisher<string>
abstract member Channels.GetPublisher : ToSend<'msg> -> Topic -> Publisher<'msg>
module PlainText

from Tutorial
Multiple items
union case Topic.Topic: string -> Topic

--------------------
module Topic

from DatatypeChannels.ASB

--------------------
[<Struct>]
type Topic = | Topic of string
abstract member Channels.UsingPublisher : ToSend<'msg> -> Topic -> (Publisher<'msg> -> Threading.Tasks.Task<unit>) -> Threading.Tasks.Task<unit>
namespace Azure.Messaging.ServiceBus.Administration
val src : Source
union case Source.Subscription: Binding -> Source
Multiple items
type CreateSubscriptionOptions =
  interface IEquatable<CreateSubscriptionOptions>
  new : topicName: string * subscriptionName: string -> unit + 1 overload
  member Equals : obj: obj -> bool + 1 overload
  member GetHashCode : unit -> int
  static member op_Equality : left: CreateSubscriptionOptions * right: CreateSubscriptionOptions -> bool
  static member op_Inequality : left: CreateSubscriptionOptions * right: CreateSubscriptionOptions -> bool
  val ``<DeadLetteringOnMessageExpiration>k__BackingField`` : bool
  val ``<EnableBatchedOperations>k__BackingField`` : bool
  val ``<EnableDeadLetteringOnFilterEvaluationExceptions>k__BackingField`` : bool
  val ``<RequiresSession>k__BackingField`` : bool
  ...

--------------------
CreateSubscriptionOptions(subscription: SubscriptionProperties) : CreateSubscriptionOptions
CreateSubscriptionOptions(topicName: string, subscriptionName: string) : CreateSubscriptionOptions
union case Option.None: Option<'T>
val consumer : Consumer<obj>
abstract member Channels.GetConsumer : OfReceived<'msg> -> Source -> Consumer<'msg>
val dlq : Source
union case Source.DeadLetter: string -> Source
val dlqConsumer : Consumer<obj>
val consumer : Source
union case Source.Persistent: CreateQueueOptions * Binding list -> Source
Multiple items
type CreateQueueOptions =
  interface IEquatable<CreateQueueOptions>
  new : name: string -> unit + 1 overload
  member Equals : obj: obj -> bool + 1 overload
  member GetHashCode : unit -> int
  static member op_Equality : left: CreateQueueOptions * right: CreateQueueOptions -> bool
  static member op_Inequality : left: CreateQueueOptions * right: CreateQueueOptions -> bool
  val ``<AuthorizationRules>k__BackingField`` : AuthorizationRules
  val ``<DeadLetteringOnMessageExpiration>k__BackingField`` : bool
  val ``<EnableBatchedOperations>k__BackingField`` : bool
  val ``<EnablePartitioning>k__BackingField`` : bool
  ...

--------------------
CreateQueueOptions(name: string) : CreateQueueOptions
CreateQueueOptions(queue: QueueProperties) : CreateQueueOptions
Multiple items
[<Struct>]
type TimeSpan =
  new : hours: int * minutes: int * seconds: int -> unit + 3 overloads
  member Add : ts: TimeSpan -> TimeSpan
  member CompareTo : value: obj -> int + 1 overload
  member Divide : divisor: float -> TimeSpan + 1 overload
  member Duration : unit -> TimeSpan
  member Equals : value: obj -> bool + 2 overloads
  member GetHashCode : unit -> int
  member Multiply : factor: float -> TimeSpan
  member Negate : unit -> TimeSpan
  member Subtract : ts: TimeSpan -> TimeSpan
  ...

--------------------
TimeSpan ()
TimeSpan(ticks: int64) : TimeSpan
TimeSpan(hours: int, minutes: int, seconds: int) : TimeSpan
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : TimeSpan
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : TimeSpan
TimeSpan.FromMinutes(value: float) : TimeSpan
val tmp : Source
union case Source.Temporary: Binding list -> Source
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
namespace FSharp.Control.Tasks
module Builders

from FSharp.Control.Tasks
val task : TaskBuilder
val received : obj
TimeSpan.FromSeconds(value: float) : TimeSpan
val received : Envelope<obj> option
abstract member Consumer.Get : TimeSpan -> Threading.Tasks.Task<Envelope<'msg> option>
(c) Microsoft Corporation. All rights reserved.