DatatypeChannels.ASB


Data channeling API for Azure Service Bus

DatatypeChannels.ASB implements Datatype Channel pattern on top of official Azure Service Bus clients.

A channel is an abstraction on top of Service Bus entities and it's implemented using topics, subscriptions and queues. The core idea is that while there are many channels, all carrying messages, every channel is dedicated to a single type of message.

It works under assumptions that:

Installing

The DatatypeChannels.ASB library can be installed from NuGet:

dotnet add package DatatypeChannels.ASB

Example

This example demonstrates the complete roundtrip over the channel using DatatypeChannels.ASB API:

open DatatypeChannels.ASB
// create the entry point - DatatypeChannels
let channels = Channels.fromFqdn "mynamespace.servicebus.windows.net"
                                 (Azure.Identity.DefaultAzureCredential false)
                                 (Log ignore) // no logging

// define the "source" - subscription and routing rules, together called "binding"
let src = Subscription { Subscription = CreateSubscriptionOptions("mytopic", "mysub", MaxDeliveryCount = 1), Rule = None }

// create a consumer, specifying the convertion function from bus primitives
let consumer = channels.GetConsumer PlainText.ofReceived src // see the tutorial for details

// create a publisher, specifying the target topic and the conversion function to bus primitives
let publisher = channels.GetPublisher PlainText.toSend (Topic "mytopic")  // see the tutorial for details
Threading.Thread.Sleep 5_000 // majic number - this is how long it takes the topic to start routing messages to new subscriptions
task {
    do! publisher |> Publisher.publish "test-payload"
    let! received = TimeSpan.FromSeconds 3. |> consumer.Get
    printfn "Received: %A" received
    do! consumer.Ack received.Value.Id
}

Note that the API is task-based and the bindings are defined using Azure.Messaging.ServiceBus.Administration types. Constructing a consumer establishes the connection and sets up or updates the subscription to the rule specified. Other types of consumer sources are DeadLetter, Queue and Temporary.

Samples & documentation

Contributing and copyright

The project is hosted on GitHub where you can report issues, fork the project and submit pull requests.

The library is available under MIT license, which allows modification and redistribution for both commercial and non-commercial purposes. For more information see the License file in the GitHub repository.

namespace System
Multiple items
namespace FSharp

--------------------
namespace Microsoft.FSharp
Multiple items
namespace FSharp.Control

--------------------
namespace Microsoft.FSharp.Control
namespace FSharp.Control.Tasks
module Builders

from FSharp.Control.Tasks
namespace Azure
namespace Azure.Messaging
namespace Azure.Messaging.ServiceBus
namespace Azure.Messaging.ServiceBus.Administration
namespace DatatypeChannels
namespace DatatypeChannels.ASB
val channels : Channels
Multiple items
module Channels

from DatatypeChannels.ASB

--------------------
type Channels =
  inherit IDisposable
  abstract member GetConsumer : OfReceived<'msg> -> Source -> Consumer<'msg>
  abstract member GetPublisher : ToSend<'msg> -> Topic -> Publisher<'msg>
  abstract member UsingPublisher : ToSend<'msg> -> Topic -> (Publisher<'msg> -> Task<unit>) -> Task<unit>
val fromFqdn : fqNamespace:string -> credential:Azure.Core.TokenCredential -> log:Log -> Channels
namespace Azure.Identity
Multiple items
type DefaultAzureCredential =
  inherit TokenCredential
  new : unit -> unit + 3 overloads
  member GetToken : requestContext: TokenRequestContext *?cancellationToken: CancellationToken -> AccessToken
  member GetTokenAsync : requestContext: TokenRequestContext *?cancellationToken: CancellationToken -> ValueTask<AccessToken>
  member GetTokenImplAsync : async: bool * requestContext: TokenRequestContext * cancellationToken: CancellationToken -> ValueTask<AccessToken>
  static member GetDefaultAzureCredentialChain : factory: DefaultAzureCredentialFactory * options: DefaultAzureCredentialOptions -> TokenCredential []
  static member GetTokenFromCredentialAsync : credential: TokenCredential * requestContext: TokenRequestContext * async: bool * cancellationToken: CancellationToken -> ValueTask<AccessToken>
  static member GetTokenFromSourcesAsync : sources: TokenCredential [] * requestContext: TokenRequestContext * async: bool * cancellationToken: CancellationToken -> ValueTask<struct (AccessToken * TokenCredential)>
  static val DefaultExceptionMessage : string
  static val UnhandledExceptionMessage : string
  ...

--------------------
Azure.Identity.DefaultAzureCredential(?includeInteractiveCredentials: bool) : Azure.Identity.DefaultAzureCredential
Azure.Identity.DefaultAzureCredential(options: Azure.Identity.DefaultAzureCredentialOptions) : Azure.Identity.DefaultAzureCredential
Multiple items
union case Log.Log: (string * obj [] -> unit) -> Log

--------------------
[<Struct>]
type Log = | Log of (string * obj [] -> unit)
val ignore : value:'T -> unit
val src : Source
union case Source.Subscription: Binding -> Source
Multiple items
type CreateSubscriptionOptions =
  interface IEquatable<CreateSubscriptionOptions>
  new : topicName: string * subscriptionName: string -> unit + 1 overload
  member Equals : obj: obj -> bool + 1 overload
  member GetHashCode : unit -> int
  static member op_Equality : left: CreateSubscriptionOptions * right: CreateSubscriptionOptions -> bool
  static member op_Inequality : left: CreateSubscriptionOptions * right: CreateSubscriptionOptions -> bool
  val ``<DeadLetteringOnMessageExpiration>k__BackingField`` : bool
  val ``<EnableBatchedOperations>k__BackingField`` : bool
  val ``<EnableDeadLetteringOnFilterEvaluationExceptions>k__BackingField`` : bool
  val ``<RequiresSession>k__BackingField`` : bool
  ...

--------------------
CreateSubscriptionOptions(subscription: SubscriptionProperties) : CreateSubscriptionOptions
CreateSubscriptionOptions(topicName: string, subscriptionName: string) : CreateSubscriptionOptions
union case Option.None: Option<'T>
val consumer : Consumer<obj>
abstract member Channels.GetConsumer : OfReceived<'msg> -> Source -> Consumer<'msg>
val publisher : Publisher<string>
abstract member Channels.GetPublisher : ToSend<'msg> -> Topic -> Publisher<'msg>
Multiple items
union case Topic.Topic: string -> Topic

--------------------
module Topic

from DatatypeChannels.ASB

--------------------
[<Struct>]
type Topic = | Topic of string
namespace System.Threading
Multiple items
type Thread =
  inherit CriticalFinalizerObject
  new : start: ParameterizedThreadStart -> unit + 3 overloads
  member Abort : unit -> unit + 1 overload
  member DisableComObjectEagerCleanup : unit -> unit
  member Finalize : unit -> unit
  member GetApartmentState : unit -> ApartmentState
  member GetCompressedStack : unit -> CompressedStack
  member GetHashCode : unit -> int
  member Interrupt : unit -> unit
  member Join : unit -> unit + 2 overloads
  ...

--------------------
Threading.Thread(start: Threading.ParameterizedThreadStart) : Threading.Thread
Threading.Thread(start: Threading.ThreadStart) : Threading.Thread
Threading.Thread(start: Threading.ParameterizedThreadStart, maxStackSize: int) : Threading.Thread
Threading.Thread(start: Threading.ThreadStart, maxStackSize: int) : Threading.Thread
Threading.Thread.Sleep(timeout: TimeSpan) : unit
Threading.Thread.Sleep(millisecondsTimeout: int) : unit
val task : TaskBuilder
Multiple items
union case Publisher.Publisher: ('msg -> Threading.Tasks.Task<unit>) -> Publisher<'msg>

--------------------
module Publisher

from DatatypeChannels.ASB

--------------------
[<Struct>]
type Publisher<'msg> = | Publisher of ('msg -> Task<unit>)
val publish : msg:'msg -> Publisher<'msg> -> Threading.Tasks.Task<unit>
val received : Envelope<obj> option
Multiple items
[<Struct>]
type TimeSpan =
  new : hours: int * minutes: int * seconds: int -> unit + 3 overloads
  member Add : ts: TimeSpan -> TimeSpan
  member CompareTo : value: obj -> int + 1 overload
  member Divide : divisor: float -> TimeSpan + 1 overload
  member Duration : unit -> TimeSpan
  member Equals : value: obj -> bool + 2 overloads
  member GetHashCode : unit -> int
  member Multiply : factor: float -> TimeSpan
  member Negate : unit -> TimeSpan
  member Subtract : ts: TimeSpan -> TimeSpan
  ...

--------------------
TimeSpan ()
TimeSpan(ticks: int64) : TimeSpan
TimeSpan(hours: int, minutes: int, seconds: int) : TimeSpan
TimeSpan(days: int, hours: int, minutes: int, seconds: int) : TimeSpan
TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : TimeSpan
TimeSpan.FromSeconds(value: float) : TimeSpan
abstract member Consumer.Get : TimeSpan -> Threading.Tasks.Task<Envelope<'msg> option>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
abstract member Consumer.Ack : string -> Threading.Tasks.Task<unit>
property Option.Value: Envelope<obj> with get
Envelope.Id: string
(c) Microsoft Corporation. All rights reserved.