azure_iot_operations_services/schema_registry/
client.rsuse std::sync::Arc;
use std::time::Duration;
use azure_iot_operations_mqtt::interface::ManagedClient;
use azure_iot_operations_protocol::application::ApplicationContext;
use azure_iot_operations_protocol::rpc_command;
use crate::schema_registry::schemaregistry_gen::common_types::options::CommandInvokerOptionsBuilder;
use crate::schema_registry::schemaregistry_gen::schema_registry::client as sr_client_gen;
use crate::schema_registry::{
Error, ErrorKind, GetSchemaRequest, PutSchemaRequest, Schema, ServiceError,
};
#[derive(Clone)]
pub struct Client<C>
where
C: ManagedClient + Clone + Send + Sync + 'static,
C::PubReceiver: Send + Sync,
{
get_command_invoker: Arc<sr_client_gen::GetCommandInvoker<C>>,
put_command_invoker: Arc<sr_client_gen::PutCommandInvoker<C>>,
}
impl<C> Client<C>
where
C: ManagedClient + Clone + Send + Sync + 'static,
C::PubReceiver: Send + Sync,
{
pub fn new(application_context: ApplicationContext, client: &C) -> Self {
let options = CommandInvokerOptionsBuilder::default()
.build()
.expect("Statically generated options should not fail.");
Self {
get_command_invoker: Arc::new(sr_client_gen::GetCommandInvoker::new(
application_context.clone(),
client.clone(),
&options,
)),
put_command_invoker: Arc::new(sr_client_gen::PutCommandInvoker::new(
application_context,
client.clone(),
&options,
)),
}
}
pub async fn get(
&self,
get_request: GetSchemaRequest,
timeout: Duration,
) -> Result<Schema, Error> {
let payload = sr_client_gen::GetRequestSchema {
name: get_request.name,
version: get_request.version,
};
let command_request = rpc_command::invoker::RequestBuilder::default()
.payload(payload)
.map_err(ErrorKind::from)?
.timeout(timeout)
.build()
.map_err(ErrorKind::from)?;
let response = self
.get_command_invoker
.invoke(command_request)
.await
.map_err(ErrorKind::from)?
.map_err(|e| Error(ErrorKind::from(ServiceError::from(e.payload))))?;
Ok(response.payload.schema.into())
}
pub async fn put(
&self,
put_request: PutSchemaRequest,
timeout: Duration,
) -> Result<Schema, Error> {
let payload = sr_client_gen::PutRequestSchema {
description: put_request.description,
display_name: put_request.display_name,
format: put_request.format.into(),
schema_content: put_request.schema_content,
schema_type: put_request.schema_type.into(),
tags: if put_request.tags.is_empty() {
None
} else {
Some(put_request.tags)
},
version: put_request.version,
};
let command_request = rpc_command::invoker::RequestBuilder::default()
.payload(payload)
.map_err(ErrorKind::from)?
.timeout(timeout)
.build()
.map_err(ErrorKind::from)?;
let response = self
.put_command_invoker
.invoke(command_request)
.await
.map_err(ErrorKind::from)?
.map_err(|e| ErrorKind::from(ServiceError::from(e.payload)))?;
Ok(response.payload.schema.into())
}
pub async fn shutdown(&self) -> Result<(), Error> {
self.get_command_invoker
.shutdown()
.await
.map_err(ErrorKind::from)?;
self.put_command_invoker
.shutdown()
.await
.map_err(ErrorKind::from)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use azure_iot_operations_mqtt::{
MqttConnectionSettingsBuilder,
session::{Session, SessionOptionsBuilder},
};
use azure_iot_operations_protocol::application::ApplicationContextBuilder;
use crate::schema_registry::{
Client, DEFAULT_SCHEMA_VERSION, Error, ErrorKind, Format, GetSchemaRequestBuilder,
GetSchemaRequestBuilderError, PutSchemaRequestBuilder, PutSchemaRequestBuilderError,
SchemaType,
};
fn create_session() -> Session {
let connection_settings = MqttConnectionSettingsBuilder::default()
.hostname("localhost")
.client_id("test_client")
.build()
.unwrap();
let session_options = SessionOptionsBuilder::default()
.connection_settings(connection_settings)
.build()
.unwrap();
Session::new(session_options).unwrap()
}
const TEST_SCHEMA_NAME: &str = "test_schema_name";
const TEST_SCHEMA_CONTENT: &str = r#"
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"test": {
"type": "integer"
},
}
}
"#;
#[tokio::test]
async fn test_get_request_valid() {
let get_request = GetSchemaRequestBuilder::default()
.name(TEST_SCHEMA_NAME.to_string())
.build()
.unwrap();
assert_eq!(get_request.name, TEST_SCHEMA_NAME);
assert_eq!(get_request.version, DEFAULT_SCHEMA_VERSION.to_string());
}
#[tokio::test]
async fn test_get_request_invalid_name() {
let get_request = GetSchemaRequestBuilder::default().build();
assert!(matches!(
get_request.unwrap_err(),
GetSchemaRequestBuilderError::UninitializedField(_)
));
let get_request = GetSchemaRequestBuilder::default()
.name(String::new())
.build();
assert!(matches!(
get_request.unwrap_err(),
GetSchemaRequestBuilderError::ValidationError(_)
));
}
#[tokio::test]
async fn test_get_request_invalid_version() {
let get_request = GetSchemaRequestBuilder::default()
.name(TEST_SCHEMA_NAME.to_string())
.version(String::new())
.build();
assert!(matches!(
get_request.unwrap_err(),
GetSchemaRequestBuilderError::ValidationError(_)
));
}
#[tokio::test]
async fn test_put_request_invalid_display_name() {
let put_request = PutSchemaRequestBuilder::default()
.schema_content(TEST_SCHEMA_CONTENT.to_string())
.format(Format::JsonSchemaDraft07)
.display_name(String::new())
.build();
assert!(matches!(
put_request.unwrap_err(),
PutSchemaRequestBuilderError::ValidationError(_)
));
}
#[tokio::test]
async fn test_put_request_invalid_version() {
let put_request = PutSchemaRequestBuilder::default()
.schema_content(TEST_SCHEMA_CONTENT.to_string())
.format(Format::JsonSchemaDraft07)
.version(String::new())
.build();
assert!(matches!(
put_request.unwrap_err(),
PutSchemaRequestBuilderError::ValidationError(_)
));
}
#[tokio::test]
async fn test_put_request_invalid_schema_content() {
let put_request = PutSchemaRequestBuilder::default()
.schema_content(String::new())
.format(Format::JsonSchemaDraft07)
.build();
assert!(matches!(
put_request.unwrap_err(),
PutSchemaRequestBuilderError::ValidationError(_)
));
}
#[tokio::test]
async fn test_put_request_valid() {
let put_request = PutSchemaRequestBuilder::default()
.schema_content(TEST_SCHEMA_CONTENT.to_string())
.format(Format::JsonSchemaDraft07)
.build()
.unwrap();
assert_eq!(put_request.schema_content, TEST_SCHEMA_CONTENT);
assert!(matches!(put_request.format, Format::JsonSchemaDraft07));
assert!(matches!(put_request.schema_type, SchemaType::MessageSchema));
assert_eq!(put_request.tags, HashMap::new());
assert_eq!(put_request.version, DEFAULT_SCHEMA_VERSION.to_string());
}
#[tokio::test]
async fn test_get_timeout_invalid() {
let session = create_session();
let client = Client::new(
ApplicationContextBuilder::default().build().unwrap(),
&session.create_managed_client(),
);
let get_result = client
.get(
GetSchemaRequestBuilder::default()
.name(TEST_SCHEMA_NAME.to_string())
.build()
.unwrap(),
std::time::Duration::from_millis(0),
)
.await;
assert!(matches!(
get_result.unwrap_err(),
Error(ErrorKind::InvalidRequestArgument(_))
));
let get_result = client
.get(
GetSchemaRequestBuilder::default()
.name(TEST_SCHEMA_NAME.to_string())
.build()
.unwrap(),
std::time::Duration::from_secs(u64::from(u32::MAX) + 1),
)
.await;
assert!(matches!(
get_result.unwrap_err(),
Error(ErrorKind::InvalidRequestArgument(_))
));
}
#[tokio::test]
async fn test_put_timeout_invalid() {
let session = create_session();
let client = Client::new(
ApplicationContextBuilder::default().build().unwrap(),
&session.create_managed_client(),
);
let put_result = client
.put(
PutSchemaRequestBuilder::default()
.schema_content(TEST_SCHEMA_CONTENT.to_string())
.format(Format::JsonSchemaDraft07)
.build()
.unwrap(),
std::time::Duration::from_millis(0),
)
.await;
assert!(matches!(
put_result.unwrap_err(),
Error(ErrorKind::InvalidRequestArgument(_))
));
let put_result = client
.put(
PutSchemaRequestBuilder::default()
.schema_content(TEST_SCHEMA_CONTENT.to_string())
.format(Format::JsonSchemaDraft07)
.build()
.unwrap(),
std::time::Duration::from_secs(u64::from(u32::MAX) + 1),
)
.await;
assert!(matches!(
put_result.unwrap_err(),
Error(ErrorKind::InvalidRequestArgument(_))
));
}
}