Commit 88a78f50 authored by Ryan Berkheimer's avatar Ryan Berkheimer
Browse files

checkpoint commit during addition/split of parallel and sequential sessions

parent a0b99c3b
%% Cell type:code id: tags:
``` java
List<String> added = %jars /workspaces/messageapi/build/libs/messageapi-core-1.0.0.jar
```
%% Cell type:code id: tags:
``` java
import gov.noaa.messageapi.interfaces.ISession;
```
%% Cell type:code id: tags:
``` java
import gov.noaa.messageapi.sessions.StandardSession;
```
%% Cell type:code id: tags:
``` java
ISession session = new StandardSession("/workspaces/messageapi/resources/test/native-condition/parameters.json");
```
%% Cell type:code id: tags:
``` java
import gov.noaa.messageapi.interfaces.*;
```
%% Cell type:code id: tags:
``` java
IRequest request = session.createRequest()
```
%% Cell type:code id: tags:
``` java
IRecord record = request.createRecord();
IRecord record2 = request.createRecord();
```
%% Cell type:code id: tags:
``` java
record.setField("int-field", 1);
record2.setField("int-field", 100);
```
%% Cell type:code id: tags:
``` java
record.getField("int-field").getValue()
```
%%%% Output: execute_result
1
%% Cell type:code id: tags:
``` java
IResponse response = request.submit()
```
%% Cell type:code id: tags:
``` java
response.isComplete()
```
%%%% Output: execute_result
true
%% Cell type:code id: tags:
``` java
response.getRecords().size()
```
%%%% Output: execute_result
3
%% Cell type:code id: tags:
``` java
response.getRecords().forEach(r -> r.getFields().forEach(f->System.out.println(f.getId() + ", " + f.getValue())))
```
%%%% Output: stream
int-field, 100
int-field, 100
int-field, 1
int-field, 100
%% Cell type:code id: tags:
``` java
IResponse response2 = request.submit()
```
%% Cell type:code id: tags:
``` java
response2.isComplete()
```
%%%% Output: execute_result
true
%% Cell type:code id: tags:
``` java
response2.getRecords().size()
```
%%%% Output: execute_result
3
%% Cell type:code id: tags:
``` java
record.getField("int-field").getValue()
```
%%%% Output: execute_result
1
%% Cell type:code id: tags:
``` java
record.getFields()
```
%%%% Output: execute_result
[gov.noaa.messageapi.fields.DefaultField@233d0729]
[gov.noaa.messageapi.fields.DefaultField@44561e47]
%% Cell type:code id: tags:
``` java
record.getFields().forEach(f->System.out.println(f.getId() + ", " + f.getValue()))
```
%%%% Output: stream
int-field, 1
%% Cell type:code id: tags:
``` java
response.getRecords().size()
```
%%%% Output: execute_result
3
%% Cell type:code id: tags:
``` java
response.getRejections().size()
```
%%%% Output: execute_result
0
%% Cell type:code id: tags:
``` java
response2.getRejections().size()
```
%%%% Output: execute_result
0
%% Cell type:code id: tags:
``` java
ISession fileReaderSession = new StandardSession("/workspaces/messageapi/resources/test/file-reader/parameter_map_style.json")
```
%% Cell type:code id: tags:
``` java
IRequest fileReaderRequest = fileReaderSession.createRequest();
```
%% Cell type:code id: tags:
``` java
IRecord record = fileReaderRequest.createRecord();
```
%% Cell type:code id: tags:
``` java
record.getFields().forEach(f->System.out.println(f.getId() + ", " + f.getValue()))
```
%%%% Output: stream
file-path, null
%% Cell type:code id: tags:
``` java
record.setField("file-path", "/workspaces/messageapi/build/resources/test/inputs/file-reader/proc_sm_gtsnp_data_ftp_CF6_cf6_20190506.txt");
```
%% Cell type:code id: tags:
``` java
IResponse fileReaderResponse = fileReaderRequest.submit();
fileReaderResponse.isComplete();
```
%%%% Output: execute_result
false
%% Cell type:code id: tags:
``` java
fileReaderResponse.isComplete();
```
%%%% Output: execute_result
false
true
%% Cell type:code id: tags:
``` java
fileReaderResponse.getRecords().size();
```
%%%% Output: execute_result
79794
%% Cell type:code id: tags:
``` java
fileReaderResponse.getRecords().get(100);
```
%%%% Output: execute_result
gov.noaa.messageapi.records.schema.SchemaRecord@3136b65f
gov.noaa.messageapi.records.schema.SchemaRecord@5470b59a
%% Cell type:code id: tags:
``` java
fileReaderResponse.getRecords().get(100).getFields().forEach(f -> System.out.println(f.getId() + ", " + f.getValue()))
```
%%%% Output: stream
value, # LAST OF SEVERAL OCCURRENCES
length, 29
number, 100
%% Cell type:code id: tags:
``` java
fileReaderResponse.getRecords().get(99).getField("value").getValue()
```
%%%% Output: execute_result
NOTES:
%% Cell type:code id: tags:
``` java
import java.util.List;
```
%% Cell type:markdown id: tags:
**FTP Lister Using a Standard Session**
%% Cell type:code id: tags:
``` java
import gov.noaa.messageapi.sessions.StandardSession;
import gov.noaa.messageapi.interfaces.*;
```
%% Cell type:code id: tags:
``` java
String parameterMap = "/workspaces/messageapi/resources/test/ftp-lister/parameters_map_style.json";
ISession ftpListSession = new StandardSession(parameterMap);
IRequest ftpListRequest = ftpListSession.createRequest();
IRecord ftpListRecord = ftpListRequest.createRecord();
ftpListRecord.setField("directory", "pub/download/hidden");
IResponse ftpListResponse = ftpListRequest.submit();
while (!ftpListResponse.isComplete()) {}
System.out.println("Return record count: " + ftpListResponse.getRecords().size());
ftpListResponse.getRecords().forEach(ftpRecord -> {
System.out.println(ftpRecord.getField("name").getValue());
System.out.println(ftpRecord.getField("server").getValue());
System.out.println(ftpRecord.getField("path").getValue());
System.out.println(ftpRecord.getField("size").getValue());
System.out.println(ftpRecord.getField("type").getValue());
System.out.println(ftpRecord.getField("modified").getValue());
});
ftpListResponse.getRejections().forEach(ftpRejection -> {
System.out.println("Rejection!");
});
```
%%%% Output: stream
Return record count: 0
%% Cell type:code id: tags:
``` java
```
%% Cell type:code id: tags:
``` java
```
......
......@@ -5,18 +5,18 @@
int main(int argc, char **argv)
{
puts("In our native session demo test!\n");
puts("In our native sequential session demo test!\n");
puts("Hello, World\n");
fflush(stdout);
session *session = createSession("/workspaces/messageapi/resources/test/file-reader-native/manifest.json");
session *session = createSequentialSession("/workspaces/messageapi/resources/test/file-reader-native/manifest.json");
puts("Successfully created session.");
puts("Successfully created sequential session.");
fflush(stdout);
request *request1 = createRequest(session);
puts("Successfully created a request.");
puts("Successfully created a request for the sequential session.");
fflush(stdout);
record *record1 = createRequestRecord(session, request1);
......
......@@ -2,10 +2,25 @@
extern "C"
{
session *createSequentialSession(const char *spec) {
return createSession("gov/noaa/messageapi/sessions/SequentialSession", spec);
}
session *createParallelSession(const char *spec)
{
return createSession("gov/noaa/messageapi/sessions/ParallelSession", spec);
}
/**
* Creates the default session.
*/
session *createSession(const char *spec)
* Creates a default session. A default session requires a
*/
session *createDefaultSession(const char *spec)
{
return createSession("gov/noaa/messageapi/sessions/DefaultSession", spec);
}
session *createSession(const char *sessionType, const char *spec)
{
JavaVM *vm;
JNIEnv *env;
......@@ -30,7 +45,7 @@ extern "C"
return NULL;
}
jclass sessionClass = JniUtils::getNamedClass(env, "gov/noaa/messageapi/sessions/DefaultSession");
jclass sessionClass = JniUtils::getNamedClass(env, sessionType);
jmethodID createSessionMethodId = JniUtils::getMethod(env, sessionClass, "<init>", "(Ljava/lang/String;)V", false);
jstring jSpec = env->NewStringUTF(spec);
jobject jSession = env->NewObject(sessionClass, createSessionMethodId, jSpec);
......
......@@ -16,7 +16,10 @@ extern "C"
#endif
/* Session methods */
extern session *createSession(const char* specPath);
extern session *createSequentialSession(const char* specPath);
extern session *createParallelSession(const char *specPath);
extern session *createDefaultSession(const char *specPath);
extern session *createSession(const char* sessionType, const char* specPath);
extern void releaseSession(session *session);
extern request *createRequest(session *session);
......
......@@ -244,6 +244,9 @@ void ListUtils::addListItem(struct val_list *list, struct val_list *val)
list->count += 1;
}
/**
* Adds a val_map (backed by java 'Map') type item to the specified list
*/
void ListUtils::addMapItem(struct val_list *list, struct val_map *val)
{
this->jvm->CallVoidMethod(list->jlist, this->addListItemMethod(), val->jmap);
......
package gov.noaa.messageapi.sessions;
import static java.lang.String.join;
import java.io.File;
import java.util.Map;
import gov.noaa.messageapi.exceptions.MessageApiException;
import gov.noaa.messageapi.utils.general.JsonUtils;
import gov.noaa.messageapi.interfaces.IRequest;
import gov.noaa.messageapi.interfaces.ISession;
/**
* The SequentialSession allows users to ignore the system manifest and pass only
* a fully qualified path to their parameter configuration. Once created, the
* StandardSession should be used as an ISession.
*
* @author Ryan Berkheimer
*/
public class SequentialSession extends DefaultSession {
final static String standardSessionName = "standard_session.json";
final static String schemaMetadataName = "schema.json";
final static String containerMetadataName = "container.json";
final static String protocolMetadataName = "protocol.json";
/**
* Constructs a new publish session directly from a text map. The text map
* should contain all the properties needed for Session Construction.
*
* @param parameterSpec A text based map containing session construction
* parameters
* @throws Exception Throws exception if error creating session
*/
public SequentialSession(final String parameterSpec) throws Exception {
super(SequentialSession.buildSequentialSession(parameterSpec));
}
@Override
public IRequest createRequest() {
return new SequentialRequest(this.getSchema(), this.getContainer(), this.getProtocol());
}
/**
* Accesses the installed template file using an env var, parses it into json,
* and then updates targeted keys to
*/
private static ISession buildStandardSession(final String fqParamSpec) throws Exception {
final Map<String, Object> parameterMap = StandardSession.parseParameterMap(fqParamSpec);
final String sessionTemplateDir = System.getenv("MESSAGEAPI_SESSION_TEMPLATE_DIR");
final String sessionTemplateFQName = String.join(File.separator, sessionTemplateDir, standardSessionName);
final Map<String, Object> sessionMap = StandardSession.parseSessionMap(sessionTemplateFQName);
final String schemaMetadataFQName = String.join(File.separator, sessionTemplateDir, schemaMetadataName);
final String containerMetadataFQName = join(File.separator, sessionTemplateDir, containerMetadataName);
final String protocolMetadataFQName = String.join(File.separator, sessionTemplateDir, protocolMetadataName);
if (parameterMap.containsKey("fields")) {
StandardSession.setMapValue(sessionMap, "schema", "fields", fqParamSpec);
} else {
throw new MessageApiException(StandardSession.getMissingKeyErrorMessage("fields"));
}
if (parameterMap.containsKey("collections")) {
StandardSession.setMapValue(sessionMap, "container", "collections", fqParamSpec);
} else {
throw new MessageApiException(StandardSession.getMissingKeyErrorMessage("collections"));
}
if (parameterMap.containsKey("endpoints")) {
StandardSession.setMapValue(sessionMap, "protocol", "endpoints", fqParamSpec);
} else {
throw new MessageApiException(StandardSession.getMissingKeyErrorMessage("endpoints"));
}
if (parameterMap.containsKey("conditions")) {
StandardSession.setMapValue(sessionMap, "schema", "conditions", fqParamSpec);
} else {
StandardSession.removeMapKey(sessionMap, "schema", "conditions");
}
if (parameterMap.containsKey("transformations")) {
StandardSession.setMapValue(sessionMap, "container", "transformations", fqParamSpec);
} else {
StandardSession.removeMapKey(sessionMap, "container", "transformations");
}
StandardSession.setMapValue(sessionMap, "schema", "metadata", schemaMetadataFQName);
StandardSession.setMapValue(sessionMap, "container", "metadata", containerMetadataFQName);
StandardSession.setMapValue(sessionMap, "protocol", "metadata", protocolMetadataFQName);
return new DefaultSession(sessionMap);
}
/**
* Parses the session map and checks for installation errors.
*/
private static Map<String, Object> parseSessionMap(String sessionTemplateFQName) throws Exception {
try {
return JsonUtils.convertObject(JsonUtils.parse(sessionTemplateFQName));
} catch (Exception e) {
throw new MessageApiException("Failed trying to parse the standard session. Check your MessageAPI installation.");
}
}
/**
* Parses the parameter map and checks for configuration errors.
*/
private static Map<String, Object> parseParameterMap(String fqParamSpec) throws Exception {
try {
return JsonUtils.convertObject(JsonUtils.parse(fqParamSpec));
} catch (Exception e) {
throw new MessageApiException("Failed initial configuration parsing check. Double check your JSON structure.");
}
}
/**
* Sets a map value for a key on a layer (e.g., schema/container/protocol and a component)
*/
@SuppressWarnings("unchecked")
private static void setMapValue(final Map<String, Object> map, final String layer, final String component,
final String newValue) {
((Map<String, Object>) ((Map<String, Object>) ((Map<String, Object>) map.get("constructor")).get(layer))
.get("constructor")).put(component, newValue);
}
/**
* Removes a component map key attached to a given layer
*/
@SuppressWarnings("unchecked")
private static void removeMapKey(final Map<String, Object> map, final String layer, final String component) {
((Map<String, Object>) ((Map<String, Object>) ((Map<String, Object>) map.get("constructor")).get(layer))
.get("constructor")).keySet().remove(component);
}
/**
* Builds a missing required parameter key error message for the specified component.
*/
private static String getMissingKeyErrorMessage(final String key) {
return String.format("Your parameter map is missing the required '%s' key. Check your configuration. Stopping session building now.", key);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment