Commit eb1093cf authored by Ryan Berkheimer's avatar Ryan Berkheimer
Browse files

updated parallel session to include parallel version of most streams used in response sequence

parent 1b8891f2
Pipeline #6008 failed with stage
in 0 seconds
......@@ -10,6 +10,7 @@ import gov.noaa.messageapi.interfaces.IRecord
import gov.noaa.messageapi.sessions.SequentialSession
import gov.noaa.messageapi.sessions.SimpleSequentialSession
import gov.noaa.messageapi.sessions.SimpleParallelSession
import gov.noaa.messageapi.utils.general.PathUtils
......@@ -33,7 +34,7 @@ class FileReaderTests extends spock.lang.Specification {
response.getRecords().size() == 7
}*/
def "Tests submission of a full file reader task with 1 large input using a Default Session."() {
def "Tests submission of a full file reader task with 1 large input using a Sequential Session."() {
given: 'A default session condition test request'
ISession session = new SequentialSession('{}/resources/test/file-reader/manifest.json')
IRequest request = session.createRequest()
......@@ -48,7 +49,7 @@ def "Tests submission of a full file reader task with 1 large input using a Defa
response.getRecords().size() == 79794
}
def "Tests submission of a full file reader task with 1 large input using a Standard Session."() {
def "Tests submission of a full file reader task with 1 large input using a Simple Sequential Session."() {
given: 'A standard session based condition test request'
String parameterPath = PathUtils.reconcileKeywords('{}/resources/test/file-reader/parameter_map_style.json')
println parameterPath
......@@ -72,5 +73,30 @@ def "Tests submission of a full file reader task with 1 large input using a Stan
response.getRecords().size() == 79794
}
def "Tests submission of a full file reader task with 1 large input using a Simple Parallel Session."() {
given: 'A standard session based condition test request'
String parameterPath = PathUtils.reconcileKeywords('{}/resources/test/file-reader/parameter_map_style.json')
println parameterPath
ISession session = new SimpleParallelSession(parameterPath)
IRequest request = session.createRequest()
IRecord record = request.createRecord()
//String filePath = '{}/resources/test/inputs/file-reader/proc_sm_gtsnp_data_ftp_CF6_cf6_20190506.txt'
String filePath = '/workspaces/messageapi/build/resources/test/inputs/file-reader/proc_sm_gtsnp_data_ftp_CF6_cf6_20190506.txt'
record.setField('file-path', filePath)
//println record.getField('file-path').getValue()
when: 'We submit the test session and wait for completion'
//println 'submitting standard session test'
IResponse response = request.submit()
//println 'submitted standard session test'
while (!response.isComplete()) {}
//println 'response is complete'
then: 'We should have no rejections and there should be 79794 records in the return set.'
//println response.getRecords().get(0).getField("value").getValue()
response.getRejections().size() == 0
//println 'Size of return records: ' + response.getRecords().size()
response.getRecords().size() == 79794
}
}
......@@ -67,7 +67,7 @@ public class ParallelResponse extends BaseResponse implements IResponse {
*/
CompletableFuture<IPacket> validate(final ISchema schema, final List<IRecord> records) {
return CompletableFuture.supplyAsync(() -> {
final IPacket packet = PacketUtils.create(this.request.getSchema(), this.request.getRecords());
final IPacket packet = PacketUtils.createInParallel(this.request.getSchema(), this.request.getRecords());
this.setRejections(packet.getRejections());
return packet;
});
......@@ -87,7 +87,7 @@ public class ParallelResponse extends BaseResponse implements IResponse {
CompletableFuture<List<IContainerRecord>> factor(final ISchema schema, final IContainer container,
final IRecord requestRecord, final IPacket packet) {
return CompletableFuture.supplyAsync(() -> {
return ContainerUtils.convertSchemaRecords(schema, container, requestRecord, packet.getRecords());
return ContainerUtils.convertSchemaRecordsInParallel(schema, container, requestRecord, packet.getRecords());
});
}
......@@ -104,7 +104,7 @@ public class ParallelResponse extends BaseResponse implements IResponse {
*/
CompletableFuture<List<IProtocolRecord>> prepare(final IProtocol protocol, final List<IContainerRecord> records) {
return CompletableFuture.supplyAsync(() -> {
return ProtocolUtils.convertContainerRecords(protocol, records);
return ProtocolUtils.convertContainerRecordsInParallel(protocol, records);
});
}
......
......@@ -42,6 +42,29 @@ public class ContainerUtils {
}).collect(Collectors.toList());
}
/**
* Converts a set of schema records to container records according to the
* specified container IN PARALLEL. This involves first creating a record blueprint based on
* the container definition (which contains field sets (called field units), and
* transformation sets (called transformation units), and then duplicating the
* record blueprint for every schema record and copying values from those fields
* to any matching field unit. Uses the parallelStreams mechanism to do the conversion.
*
* @param container The container containing definitions of field units and
* transformation units
* @param records The schema records to be converted to container records
* @return A list of container records
*/
public static List<IContainerRecord> convertSchemaRecordsInParallel(final ISchema schema, final IContainer container,
final IRecord requestRecord, final List<IRecord> schemaRecords) {
final IContainerRecord containerRecordTemplate = createRecordTemplate(container);
return schemaRecords.parallelStream().map(schemaRecord -> {
final IContainerRecord containerRecord = CollectionUtils.setFieldValues(containerRecordTemplate.getCopy(),
schemaRecord.getFields());
return CollectionUtils.validateCollectionConditions(schema, containerRecord, requestRecord);
}).collect(Collectors.toList());
}
/**
* Creates an empty record for use in record conversion by copy. Uses the
* definition of the provided container to create a new empty record object,
......
......@@ -36,6 +36,19 @@ public class PacketUtils {
return dataPacket;
}
public static IPacket createInParallel(final ISchema schema, final List<IRecord> records) {
final IPacket dataPacket = new DefaultPacket();
final List<IRejection> primaryRejections = RejectionUtils.getRequiredFieldRejectionsInParallel(records);
final List<IRecord> filteredRecords = SchemaUtils.filterFieldlessConditionsInParallel(
SchemaUtils.filterNonValuedConditionsInParallel(SchemaUtils.filterRejectionsInParallel(records, primaryRejections)));
final List<IRejection> secondaryRejections = RejectionUtils.getFieldConditionRejectionsInParallel(schema,
filteredRecords);
dataPacket.setRecords(SchemaUtils.filterRejectionsInParallel(filteredRecords, secondaryRejections));
dataPacket.setRejections(ListUtils
.flatten(new ArrayList<List<IRejection>>(Arrays.asList(primaryRejections, secondaryRejections))));
return dataPacket;
}
public static IPacket combineResults(final List<IPacket> packets) {
final List<IRecord> allRecords = ListUtils
.flatten(packets.stream().map(packet -> packet.getRecords()).collect(Collectors.toList()));
......
......@@ -46,6 +46,25 @@ public class ProtocolUtils {
}).collect(Collectors.toList());
}
/**
* Converts a list of containerized(factored into collections) records into
* protocol records for use by endpoint connections IN PARALLEL. A protocol record has a one
* to one relationship with a connection - i.e., there is one and only one
* protocol record created for every connection held by the protocol passed to
* this method.
*
* @param protocol The protocol holding connection instances
* @param containerRecords A list of container Records to be parsed,
* categorized, and loaded into protocol records
* @return A list of protocol records, one per connection
*/
public static List<IProtocolRecord> convertContainerRecordsInParallel(final IProtocol protocol,
final List<IContainerRecord> containerRecords) {
return protocol.getConnections().parallelStream().map(conn -> {
return createProtocolRecordInParallel(conn, containerRecords);
}).collect(Collectors.toList());
}
/**
* The singular method called by convertContainerRecords, accepts a single
* connection and a list of container records, and creates a protocol record
......@@ -73,6 +92,33 @@ public class ProtocolUtils {
return new ProtocolRecord(connection, MapUtils.mergeMapList(recordList));
}
/**
* The singular method called by convertContainerRecords, accepts a single
* connection and a list of container records, and creates a protocol record
* based on what the connection specified as data needs IN PARALLEL.
*
* @param connection A connection containing information on what type of
* records it needs (classifications, collections,
* transformations)
* @param containerRecords A list of container records that will be the basis
* for records added to the protocol record
* @return A protocol record, containing a record map based on the
* containerRecords, for the given connection
*/
public static IProtocolRecord createProtocolRecordInParallel(final IConnection connection,
final List<IContainerRecord> containerRecords) {
final List<String> connCollections = connection.getCollections();
final Map<String, List<Object>> connClassifiers = connection.getClassifiers();
final List<Map<IRecord, Map<String, Object>>> recordList = ListUtils
.flatten(containerRecords.parallelStream().map(containerRecord -> {
final UUID recordId = containerRecord.getId();
return ListUtils.removeAllNulls(containerRecord.getCollections().parallelStream().map(collection -> {
return convertCollectionToRecord(collection, recordId, connCollections, connClassifiers);
}).collect(Collectors.toList()));
}).collect(Collectors.toList()));
return new ProtocolRecord(connection, MapUtils.mergeMapList(recordList));
}
/**
* Creates a new collection record map entry for the IProtocolRecord map. This
* entry has a key of the record itself (created from the collection field set),
......
......@@ -45,6 +45,31 @@ public class RejectionUtils {
return new ArrayList<IRejection>();
}
/**
* Creates a new list of rejections for the given input record set.
*
* @param records Records to analyze/determine rejections
* @return A list of rejections created for the input records.
*/
public static List<IRejection> getRequiredFieldRejectionsInParallel(final List<IRecord> records) {
final List<IRejection> rejections = records.parallelStream().map(r -> {
final List<String> reasons = r.getFields().parallelStream().map(f -> {
if (!FieldUtils.validateRequired(f)) {
return ReasonUtils.getMissingRequiredFieldReason(f);
}
return null;
}).collect(Collectors.toList());
if (!ListUtils.isAllNulls(reasons)) {
return new DefaultRejection(r, ListUtils.removeAllNulls(reasons));
}
return null;
}).collect(Collectors.toList());
if (!ListUtils.isAllNulls(rejections)) {
return ListUtils.removeAllNulls(rejections);
}
return new ArrayList<IRejection>();
}
/**
* Return a list of rejections for a given record list by looking at each record
* individually, comparing the field values to conditions, and creating a
......@@ -68,4 +93,28 @@ public class RejectionUtils {
return new ArrayList<IRejection>();
}
/**
* Return a list of rejections for a given record list by looking at each record
* individually, comparing the field values to conditions, and creating a
* rejection for any record with field values that do not meet the conditions.
*
* @param schema A schema holding the comparison operator factory
* @param records A list of record to use for rejection assembly
* @return A list of rejections based on invalid records
*/
public static List<IRejection> getFieldConditionRejectionsInParallel(final ISchema schema, final List<IRecord> records) {
final List<IRejection> rejections = records.parallelStream().map(r -> {
if (!FieldUtils.validateConditions(schema, r, ConditionUtils.getTopLevelConditions(r))) {
final String reason = ReasonUtils.getInvalidFieldReason();
return new DefaultRejection(r, reason);
}
return null;
}).collect(Collectors.toList());
if (!ListUtils.isAllNulls(rejections)) {
return ListUtils.removeAllNulls(rejections);
}
return new ArrayList<IRejection>();
}
}
......@@ -41,6 +41,26 @@ public class SchemaUtils {
return records;
}
/**
* Returns a new list of IRecords based on the passed IRecord list with those
* IRecords that are also contained in the rejections removed.
*
* @param records A list of Records to filter
* @param rejections A list of rejections to filter from the Records
* @return A new list of records minus the rejections
*/
public static List<IRecord> filterRejectionsInParallel(final List<IRecord> records, final List<IRejection> rejections) {
if (rejections != null) {
final List<IRecord> rejectedRecords = rejections.parallelStream().map(r -> {
return r.getRecord();
}).collect(Collectors.toList());
final List<IRecord> filteredRecords = records.parallelStream().filter(r -> !rejectedRecords.contains(r))
.collect(Collectors.toList());
return filteredRecords;
}
return records;
}
/**
* Returns a new list of IRecords based on the passed IRecord list with all
* non-valued fields (fields without values) removed from each record.
......@@ -67,10 +87,22 @@ public class SchemaUtils {
public static List<IRecord> filterNonValuedConditions(final List<IRecord> records) {
return records.stream().map(r -> {
final List<ICondition> conditions = ConditionUtils.filterNonValuedConditions(r.getConditions());
// IRecord newR = r.getCopy();
// newR.setConditions(conditions);
r.setConditions(conditions);
// return newR;
return r;
}).collect(Collectors.toList());
}
/**
* Returns a new list of IRecords based on the passed IRecord list with all
* non-valued conditions (conditions without values) removed from each record.
*
* @param records The list of records to process
* @return A list of new IRecords with non-valued conditions removed.
*/
public static List<IRecord> filterNonValuedConditionsInParallel(final List<IRecord> records) {
return records.stream().map(r -> {
final List<ICondition> conditions = ConditionUtils.filterNonValuedConditionsInParallel(r.getConditions());
r.setConditions(conditions);
return r;
}).collect(Collectors.toList());
}
......@@ -92,5 +124,22 @@ public class SchemaUtils {
}).collect(Collectors.toList());
}
/**
* Returns a new list of IRecords based on the passed IRecord list with all
* fieldless conditions (conditions corresponding to a non-existent record
* field) removed from the record. Does it in parallel.
*
* @param records The list of records to process
* @return A list of new IRecords with fieldless conditions removed.
*/
public static List<IRecord> filterFieldlessConditionsInParallel(final List<IRecord> records) {
return records.parallelStream().map(r -> {
final List<ICondition> conditions = ConditionUtils.filterFieldlessConditions(r);
final IRecord newR = r.getCopy();
newR.setConditions(conditions);
return newR;
}).collect(Collectors.toList());
}
}
......@@ -61,6 +61,17 @@ public class ConditionUtils {
return conditions.stream().filter(c -> c.getValue() != null).collect(Collectors.toList());
}
/**
* Returns a subset of the input conditions list by filtering out those
* conditions without a set value.
*
* @param conditions The input field set to filter
* @return A list of filtered fields containing only those with values.
*/
public static List<ICondition> filterNonValuedConditionsInParallel(final List<ICondition> conditions) {
return conditions.parallelStream().filter(c -> c.getValue() != null).collect(Collectors.toList());
}
/**
* For a given record, filters all conditions corresponding to a field that is
* no longer attached to a record. This is done by first assembling a current
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment