Commit 5614c5f5 authored by Ryan Berkheimer's avatar Ryan Berkheimer
Browse files

Completed upgrade to split DefaultSession into ParallelSession and...

Completed upgrade to split DefaultSession into ParallelSession and SequentialSession; upgrade StandardSession into SimpleParallelSession and SimpleSequentialSession; add new test for creation of parallelsession; added export methods in the C lib for all session types; need to add more comprehensive testing for parallel sessions
parent 88a78f50
......@@ -93,7 +93,7 @@ This package includes the MessageAPI specification as a set of interfaces, usefu
Using MessageAPI means taking a data-messaging process, defining the process alphabet in terms of fields and conditions, determining how those fields are contained as different record permutations (including any transformations or conditions that affect that containment), and determining what to do with the records. All this information is then laid out in the session manifest and parameter map, and then data is moved through the initialized session by submitting records as either discrete requests, or resubmitting new records as a stream on the same request.
As previously described, in the provided implementation of the DefaultSession, records can be moved through the system in batch by using separate requests, or in stream by reusing the same request. When streaming over the same request, endpoints have a shared state over submissions.
As previously described, in the provided implementation of the SequentialSession, records can be moved through the system in batch by using separate requests, or in stream by reusing the same request. When streaming over the same request, endpoints have a shared state over submissions.
The MessageAPI information model manifest is completely pluggable and is read when specifically referenced at runtime in code (i.e., ISession s = new Session('manifest.json')). This allows targeted improvements or modifications of the standard. Want to change the containers to ship somewhere else? Build Docker containers? Make Requests enforce always-batch behavior? Change the container plugin, or another plugin, and reuse the rest.
......@@ -101,11 +101,11 @@ The complete pluggability of MessageAPI also provides another powerful potential
The default MessageAPI session provides a standard topology that will suit many messaging tasks (either publishing or consuming oriented, batch or stream oriented), along with providing a matching set of basic plugins. The provided implementations are well suited for individual or distributed use. For example, a MessageAPI session could be wrapped as a Kafka consumer, deployed as a single runtime pod in Kubernetes as a consumer group, started up, and then fed records coming from a service. Or a session could be used intra-process to send emails to different groups based on different conditions. Or, as recently discovered, MessageAPI could serve as the in-core engine for custom Apache NiFi processors. There are a lot of varied uses.
The following is an example of a DefaultSession manifest:
The following is an example of a SequentialSession manifest:
```json
{
"plugin": "gov.noaa.messageapi.sessions.DefaultSession",
"plugin": "gov.noaa.messageapi.sessions.SequentialSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
......@@ -564,9 +564,9 @@ or
```java
import gov.noaa.messageapi.interfaces.ISession;
import gov.noaa.messageapi.sessions.DefaultSession;
import gov.noaa.messageapi.sessions.SequentialSession;
ISession session = new DefaultSession("/path/to/session_manifest.json");
ISession session = new SequentialSession("/path/to/session_manifest.json");
```
Using the created Session object, create a Request - the Request type was baked into the session on session creation. All fields and conditions that define the Request Record template will be available from the same Record.
......
......@@ -64,7 +64,7 @@ test {
testLogging {
showStandardStreams = true
}
environment "MESSAGEAPI_SESSION_TEMPLATE_DIR", "/workspaces/messageapi/resources/test/standard_session"
environment "MESSAGEAPI_SESSION_TEMPLATE_DIR", "/workspaces/messageapi/resources/test/session_templates"
}
......
%% Cell type:code id: tags:
``` java
List<String> added = %jars /workspaces/messageapi/build/libs/messageapi-core-1.0.0.jar
```
%% Cell type:code id: tags:
``` java
import gov.noaa.messageapi.interfaces.ISession;
```
%% Cell type:code id: tags:
``` java
import gov.noaa.messageapi.sessions.StandardSession;
import gov.noaa.messageapi.sessions.SimpleSequentialSession;
```
%% Cell type:code id: tags:
``` java
ISession session = new StandardSession("/workspaces/messageapi/resources/test/native-condition/parameters.json");
ISession session = new SimpleSequentialSession("/workspaces/messageapi/resources/test/native-condition/parameters.json");
```
%% Cell type:code id: tags:
``` java
import gov.noaa.messageapi.interfaces.*;
```
%% Cell type:code id: tags:
``` java
IRequest request = session.createRequest()
```
%% Cell type:code id: tags:
``` java
IRecord record = request.createRecord();
IRecord record2 = request.createRecord();
```
%% Cell type:code id: tags:
``` java
record.setField("int-field", 1);
record2.setField("int-field", 100);
```
%% Cell type:code id: tags:
``` java
record.getField("int-field").getValue()
```
%%%% Output: execute_result
1
%% Cell type:code id: tags:
``` java
IResponse response = request.submit()
```
%% Cell type:code id: tags:
``` java
response.isComplete()
```
%%%% Output: execute_result
true
%% Cell type:code id: tags:
``` java
response.getRecords().size()
```
%%%% Output: execute_result
3
%% Cell type:code id: tags:
``` java
response.getRecords().forEach(r -> r.getFields().forEach(f->System.out.println(f.getId() + ", " + f.getValue())))
```
%%%% Output: stream
int-field, 100
int-field, 1
int-field, 100
%% Cell type:code id: tags:
``` java
IResponse response2 = request.submit()
```
%% Cell type:code id: tags:
``` java
response2.isComplete()
```
%%%% Output: execute_result
true
%% Cell type:code id: tags:
``` java
response2.getRecords().size()
```
%%%% Output: execute_result
3
%% Cell type:code id: tags:
``` java
record.getField("int-field").getValue()
```
%%%% Output: execute_result
1
%% Cell type:code id: tags:
``` java
record.getFields()
```
%%%% Output: execute_result
[gov.noaa.messageapi.fields.DefaultField@44561e47]
%% Cell type:code id: tags:
``` java
record.getFields().forEach(f->System.out.println(f.getId() + ", " + f.getValue()))
```
%%%% Output: stream
int-field, 1
%% Cell type:code id: tags:
``` java
response.getRecords().size()
```
%%%% Output: execute_result
3
%% Cell type:code id: tags:
``` java
response.getRejections().size()
```
%%%% Output: execute_result
0
%% Cell type:code id: tags:
``` java
response2.getRejections().size()
```
%%%% Output: execute_result
0
%% Cell type:code id: tags:
``` java
ISession fileReaderSession = new StandardSession("/workspaces/messageapi/resources/test/file-reader/parameter_map_style.json")
ISession fileReaderSession = new SimpleSequentialSession("/workspaces/messageapi/resources/test/file-reader/parameter_map_style.json")
```
%% Cell type:code id: tags:
``` java
IRequest fileReaderRequest = fileReaderSession.createRequest();
```
%% Cell type:code id: tags:
``` java
IRecord record = fileReaderRequest.createRecord();
```
%% Cell type:code id: tags:
``` java
record.getFields().forEach(f->System.out.println(f.getId() + ", " + f.getValue()))
```
%%%% Output: stream
file-path, null
%% Cell type:code id: tags:
``` java
record.setField("file-path", "/workspaces/messageapi/build/resources/test/inputs/file-reader/proc_sm_gtsnp_data_ftp_CF6_cf6_20190506.txt");
```
%% Cell type:code id: tags:
``` java
IResponse fileReaderResponse = fileReaderRequest.submit();
fileReaderResponse.isComplete();
```
%%%% Output: execute_result
false
%% Cell type:code id: tags:
``` java
fileReaderResponse.isComplete();
```
%%%% Output: execute_result
true
%% Cell type:code id: tags:
``` java
fileReaderResponse.getRecords().size();
```
%%%% Output: execute_result
79794
%% Cell type:code id: tags:
``` java
fileReaderResponse.getRecords().get(100);
```
%%%% Output: execute_result
gov.noaa.messageapi.records.schema.SchemaRecord@5470b59a
%% Cell type:code id: tags:
``` java
fileReaderResponse.getRecords().get(100).getFields().forEach(f -> System.out.println(f.getId() + ", " + f.getValue()))
```
%%%% Output: stream
value, # LAST OF SEVERAL OCCURRENCES
length, 29
number, 100
%% Cell type:code id: tags:
``` java
fileReaderResponse.getRecords().get(99).getField("value").getValue()
```
%%%% Output: execute_result
NOTES:
%% Cell type:code id: tags:
``` java
import java.util.List;
```
%% Cell type:markdown id: tags:
**FTP Lister Using a Standard Session**
%% Cell type:code id: tags:
``` java
import gov.noaa.messageapi.sessions.StandardSession;
import gov.noaa.messageapi.sessions.SimpleSequentialSession;
import gov.noaa.messageapi.interfaces.*;
```
%% Cell type:code id: tags:
``` java
String parameterMap = "/workspaces/messageapi/resources/test/ftp-lister/parameters_map_style.json";
ISession ftpListSession = new StandardSession(parameterMap);
ISession ftpListSession = new SimpleSequentialSession(parameterMap);
IRequest ftpListRequest = ftpListSession.createRequest();
IRecord ftpListRecord = ftpListRequest.createRecord();
ftpListRecord.setField("directory", "pub/download/hidden");
IResponse ftpListResponse = ftpListRequest.submit();
while (!ftpListResponse.isComplete()) {}
System.out.println("Return record count: " + ftpListResponse.getRecords().size());
ftpListResponse.getRecords().forEach(ftpRecord -> {
System.out.println(ftpRecord.getField("name").getValue());
System.out.println(ftpRecord.getField("server").getValue());
System.out.println(ftpRecord.getField("path").getValue());
System.out.println(ftpRecord.getField("size").getValue());
System.out.println(ftpRecord.getField("type").getValue());
System.out.println(ftpRecord.getField("modified").getValue());
});
ftpListResponse.getRejections().forEach(ftpRejection -> {
System.out.println("Rejection!");
});
```
%%%% Output: stream
Return record count: 0
%% Cell type:code id: tags:
``` java
```
%% Cell type:code id: tags:
``` java
```
......
{
"plugin": "gov.noaa.messageapi.sessions.DefaultSession",
"plugin": "gov.noaa.messageapi.sessions.ParallelSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
......
{
"plugin": "gov.noaa.messageapi.sessions.SequentialSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
"constructor": {
"metadata": "{_schema_metadata_}",
"fields": "{_standard_}",
"conditions": "{_standard_}"
}
},
"container": {
"plugin": "gov.noaa.messageapi.containers.DefaultContainer",
"constructor": {
"metadata": "{_container_metadata_}",
"collections": "{_standard_}",
"transformations": "{_standard_}"
}
},
"protocol": {
"plugin": "gov.noaa.messageapi.protocols.DefaultProtocol",
"constructor": {
"metadata": "{_protocol_metadata_}",
"endpoints": "{_standard_}"
}
}
}
}
\ No newline at end of file
{
"plugin": "gov.noaa.messageapi.sessions.DefaultSession",
"plugin": "gov.noaa.messageapi.sessions.SequentialSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
......
{
"plugin": "gov.noaa.messageapi.sessions.DefaultSession",
"plugin": "gov.noaa.messageapi.sessions.SequentialSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
......
{
"plugin": "gov.noaa.messageapi.sessions.DefaultSession",
"plugin": "gov.noaa.messageapi.sessions.SequentialSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
......
{
"plugin": "gov.noaa.messageapi.sessions.DefaultSession",
"plugin": "gov.noaa.messageapi.sessions.SequentialSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
......
{
"plugin": "gov.noaa.messageapi.sessions.DefaultSession",
"plugin": "gov.noaa.messageapi.sessions.SequentialSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
......
{
"plugin": "gov.noaa.messageapi.sessions.DefaultSession",
"plugin": "gov.noaa.messageapi.sessions.ParallelSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
......
{
"plugin": "gov.noaa.messageapi.sessions.SequentialSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
"constructor": {
"metadata": "{_schema_metadata_}",
"fields": "{_standard_}",
"conditions": "{_standard_}"
}
},
"container": {
"plugin": "gov.noaa.messageapi.containers.DefaultContainer",
"constructor": {
"metadata": "{_container_metadata_}",
"collections": "{_standard_}",
"transformations": "{_standard_}"
}
},
"protocol": {
"plugin": "gov.noaa.messageapi.protocols.DefaultProtocol",
"constructor": {
"metadata": "{_protocol_metadata_}",
"endpoints": "{_standard_}"
}
}
}
}
\ No newline at end of file
{
"plugin": "gov.noaa.messageapi.sessions.SequentialSession",
"constructor": {
"schema": {
"plugin": "gov.noaa.messageapi.schemas.DefaultSchema",
"constructor": {
"metadata": "{_schema_metadata_}",
"fields": "{_standard_}",
"conditions": "{_standard_}"
}
},
"container": {
"plugin": "gov.noaa.messageapi.containers.DefaultContainer",
"constructor": {
"metadata": "{_container_metadata_}",
"collections": "{_standard_}",
"transformations": "{_standard_}"
}
},
"protocol": {
"plugin": "gov.noaa.messageapi.protocols.DefaultProtocol",
"constructor": {
"metadata": "{_protocol_metadata_}",
"endpoints": "{_standard_}"
}
}
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment