azure_iot_operations_protocol/telemetry/
cloud_event.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use std::{
    fmt::{self, Display, Formatter},
    str::FromStr,
};

use chrono::DateTime;
use fluent_uri::{Uri, UriRef};
use regex::Regex;

/// Default spec version for a `CloudEvent`. Compliant event producers MUST
/// use a value of 1.0 when referring to this version of the specification.
pub const DEFAULT_CLOUD_EVENT_SPEC_VERSION: &str = "1.0";
/// Default `CloudEvent` event type for AIO telemetry.
pub const DEFAULT_CLOUD_EVENT_EVENT_TYPE: &str = "ms.aio.telemetry";

/// Enum representing the cloud event fields.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum CloudEventFields {
    /// Identifies the event. Producers MUST ensure that source + id is unique for each distinct event.
    /// If a duplicate event is re-sent (e.g. due to a network error) it MAY have the same id.
    /// Consumers MAY assume that Events with identical source and id are duplicates.
    Id,
    /// Identifies the context in which an event happened.
    /// Often this will include information such as the type of the event source,
    /// the organization publishing the event or the process that produced the event.
    /// The exact syntax and semantics behind the data encoded in the URI is defined by the event producer.
    Source,
    /// The version of the `CloudEvents` specification which the event uses. This enables the
    /// interpretation of the context. Compliant event producers MUST use a value of 1.0 when
    /// referring to this version of the specification.
    SpecVersion,
    /// Describes the type of event related to the originating occurrence.
    /// Often this attribute is used for routing, observability, policy enforcement, etc.
    /// The format of this is producer defined and might include information such as the version of the type
    EventType,
    /// Identifies the subject of the event in the context of the event producer (identified by source).
    /// In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source,
    /// but the source identifier alone might not be sufficient as a qualifier for any specific event if the source context has internal sub-structure.
    Subject,
    /// Timestamp of when the occurrence happened.
    /// If the time of the occurrence cannot be determined then this attribute MAY be set to some other time (such as the current time)
    /// by the `CloudEvents` producer,
    /// however all producers for the same source MUST be consistent in this respect.
    Time,
    ///  Content type of data value. This attribute enables data to carry any type of content,
    ///  whereby format and encoding might differ from that of the chosen event format.
    DataContentType,
    ///  Identifies the schema that data adheres to.
    ///  Incompatible changes to the schema SHOULD be reflected by a different URI.
    DataSchema,
}

impl CloudEventFields {
    /// Validates that the cloud event field is valid based on the spec version.
    ///
    /// # Errors
    /// Returns a string containing the error message if either the field or spec version are invalid.
    ///
    /// # Panics
    /// If any regex fails to compile which is impossible given that the regex is pre-defined.
    pub fn validate(&self, value: &str, spec_version: &str) -> Result<(), String> {
        if spec_version == "1.0" {
            if value.is_empty() {
                return Err(format!("{self} cannot be empty"));
            }
            match self {
                CloudEventFields::Id
                | CloudEventFields::SpecVersion
                | CloudEventFields::EventType
                | CloudEventFields::Subject => {}
                CloudEventFields::Source => {
                    // URI reference
                    match UriRef::parse(value) {
                        Ok(_) => {}
                        Err(e) => {
                            return Err(format!(
                                "Invalid {self} value: {value}. Must adhere to RFC 3986 Section 4.1. Error: {e}"
                            ));
                        }
                    }
                }
                CloudEventFields::DataSchema => {
                    // URI
                    match Uri::parse(value) {
                        Ok(_) => {}
                        Err(e) => {
                            return Err(format!(
                                "Invalid {self} value: {value}. Must adhere to RFC 3986 Section 4.3. Error: {e}"
                            ));
                        }
                    }
                }
                CloudEventFields::Time => {
                    // serializable as RFC3339
                    match DateTime::parse_from_rfc3339(value) {
                        Ok(_) => {}
                        Err(e) => {
                            return Err(format!(
                                "Invalid {self} value: {value}. Must adhere to RFC 3339. Error: {e}"
                            ));
                        }
                    }
                }
                CloudEventFields::DataContentType => {
                    let rfc_2045_regex =
                        Regex::new(r"^([-a-z]+)/([-a-z0-9.]+)(?:\+([-a-z0-9.]+))?$")
                            .expect("Static regex string should not fail");

                    if !rfc_2045_regex.is_match(value) {
                        return Err(format!(
                            "Invalid {self} value: {value}. Must adhere to RFC 2045"
                        ));
                    }
                }
            }
        } else {
            return Err(format!("Invalid spec version: {spec_version}"));
        }
        Ok(())
    }
}

impl Display for CloudEventFields {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        match self {
            CloudEventFields::SpecVersion => write!(f, "specversion"),
            CloudEventFields::EventType => write!(f, "type"),
            CloudEventFields::Source => write!(f, "source"),
            CloudEventFields::Id => write!(f, "id"),
            CloudEventFields::Subject => write!(f, "subject"),
            CloudEventFields::Time => write!(f, "time"),
            CloudEventFields::DataContentType => write!(f, "datacontenttype"),
            CloudEventFields::DataSchema => write!(f, "dataschema"),
        }
    }
}

impl FromStr for CloudEventFields {
    type Err = ();

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "id" => Ok(CloudEventFields::Id),
            "source" => Ok(CloudEventFields::Source),
            "specversion" => Ok(CloudEventFields::SpecVersion),
            "type" => Ok(CloudEventFields::EventType),
            "subject" => Ok(CloudEventFields::Subject),
            "dataschema" => Ok(CloudEventFields::DataSchema),
            "datacontenttype" => Ok(CloudEventFields::DataContentType),
            "time" => Ok(CloudEventFields::Time),
            _ => Err(()),
        }
    }
}

#[cfg(test)]
mod tests {
    use test_case::test_case;

    use super::*;

    #[test_case(CloudEventFields::SpecVersion; "cloud_event_spec_version")]
    #[test_case(CloudEventFields::EventType; "cloud_event_type")]
    #[test_case(CloudEventFields::Source; "cloud_event_source")]
    #[test_case(CloudEventFields::Id; "cloud_event_id")]
    #[test_case(CloudEventFields::Subject; "cloud_event_subject")]
    #[test_case(CloudEventFields::Time; "cloud_event_time")]
    #[test_case(CloudEventFields::DataContentType; "cloud_event_data_content_type")]
    #[test_case(CloudEventFields::DataSchema; "cloud_event_data_schema")]
    fn test_cloud_event_to_from_string(prop: CloudEventFields) {
        assert_eq!(prop, CloudEventFields::from_str(&prop.to_string()).unwrap());
    }

    #[test]
    fn test_cloud_event_validate_empty() {
        CloudEventFields::Id
            .validate("", DEFAULT_CLOUD_EVENT_SPEC_VERSION)
            .unwrap_err();
        CloudEventFields::Source
            .validate("", DEFAULT_CLOUD_EVENT_SPEC_VERSION)
            .unwrap_err();
        CloudEventFields::SpecVersion
            .validate("", DEFAULT_CLOUD_EVENT_SPEC_VERSION)
            .unwrap_err();
        CloudEventFields::EventType
            .validate("", DEFAULT_CLOUD_EVENT_SPEC_VERSION)
            .unwrap_err();
        CloudEventFields::DataSchema
            .validate("", DEFAULT_CLOUD_EVENT_SPEC_VERSION)
            .unwrap_err();
        CloudEventFields::Subject
            .validate("", DEFAULT_CLOUD_EVENT_SPEC_VERSION)
            .unwrap_err();
        CloudEventFields::Time
            .validate("", DEFAULT_CLOUD_EVENT_SPEC_VERSION)
            .unwrap_err();
        CloudEventFields::DataContentType
            .validate("", DEFAULT_CLOUD_EVENT_SPEC_VERSION)
            .unwrap_err();
    }

    #[test_case("aio://oven/sample", true; "absolute_uri")]
    #[test_case("./bar", true; "uri_reference")]
    #[test_case("::::", false; "not_uri_reference")]
    fn test_cloud_event_validate_invalid_source(source: &str, expected: bool) {
        assert_eq!(
            CloudEventFields::Source
                .validate(source, DEFAULT_CLOUD_EVENT_SPEC_VERSION)
                .is_ok(),
            expected
        );
    }

    #[test_case("aio://oven/sample", true; "absolute_uri")]
    #[test_case("./bar", false; "uri_reference")]
    #[test_case("ht!tp://example.com", false; "invalid_uri")]
    fn test_cloud_event_validate_data_schema(data_schema: &str, expected: bool) {
        assert_eq!(
            CloudEventFields::DataSchema
                .validate(data_schema, DEFAULT_CLOUD_EVENT_SPEC_VERSION)
                .is_ok(),
            expected
        );
    }

    #[test_case("application/json", true; "json")]
    #[test_case("text/csv", true; "csv")]
    #[test_case("application/avro", true; "avro")]
    #[test_case("application/octet-stream", true; "dash_second_half")]
    #[test_case("application/f0o", true; "number_second_half")]
    #[test_case("application/f.o", true; "period_second_half")]
    #[test_case("foo/bar+bazz", true; "plus_extra")]
    #[test_case("f0o/json", false; "number_first_half")]
    #[test_case("foo", false; "no_slash")]
    #[test_case("foo/bar?bazz", false; "question_mark")]
    fn test_cloud_event_validate_data_content_type(data_content_type: &str, expected: bool) {
        assert_eq!(
            CloudEventFields::DataContentType
                .validate(data_content_type, DEFAULT_CLOUD_EVENT_SPEC_VERSION)
                .is_ok(),
            expected
        );
    }

    #[test]
    fn test_cloud_event_validate_invalid_spec_version() {
        CloudEventFields::Id.validate("id", "0.0").unwrap_err();
    }
}