/* Herein we define a class which wraps the standard procedure for using an EvalEngine in post-processing alog files At its core, it will maintain an accelerated state machine following the same events which arose in realtime. Generally, we have to assume that it was used as intended, which the Engine's contribution to the MOOSDB does not render any behavioral change in an agent which would have changed how a mission runs. An EvalEngine is purely for: 1 )real time evaluation and analysis, 2) displaying associated graphical changes in pMarineViewer (i.e. marking where and event occurred), 3) post processing an alog file which did not have real time evaluations running which can be added and viewed in alogview later, 4) obtaining the final results of any kernels (their latest publications) -> We take an alog file, and a MOOS file with the complete process configuration block -> We handle the parameters -> We enter the equivalent runtime state machine managing "OnNewMail" and "Iterate" -> The app is provided an app tick, where for the number of allowable increments throughout the mission we maintain an equivalent apptick/mailtick cycle. i.e. If the mailtick is 10, then we push new mail through these equivalent intervals if the apptick is 5, then after we've collected mail twice in the buffer, we run the evaluate cycle before clearing the infobuffer which is otherwise maintained. While we move forward, we build up the latest info buffer queue as we would in realtime. Essentially, we keep reading the lines in the alogfile until either of these events are triggered, or until we reach the end -> At the end of the OnNewMail cycle, the queue is cleared -> If it is time to revisit the iterate loop, we step into the loop, evaluate once, collect the update buffer, and write them to the alog file with the same time Desired interface and behavior ./ee target_directory config.moos {opt}--pname=pEvalEngine {opt}--tag={default=ee} --rewrite If rewriting and not reporting the latest values target_directory/ *.alog *.blog *.ylog *.xlog *._bhv *._moos target_directory_{tag+_}{YYYY-MM-DD-HH:MM:SS}/ *{_tag}.alog - rewritten alog file {appName}._moos - The EvalEngine post processors configuration block (since all else is the same in the previous other moos file) */ #pragma once #include "EvalEngine.h" #include "MOOS/libMOOS/Utils/ProcessConfigReader.h" #include <fstream> #include <sstream> #include <sys/stat.h> #include <iostream> #include "LogUtils.h" #include "MBUtils.h" #include <iterator> #include <sys/stat.h> #include <dirent.h> #include <set> #include <cstdarg> //va_list, va_start, va_end class AlogFileInfo { public: std::string log_start_time; std::string fname; uint64_t num_lines; uint64_t num_badlines; uint64_t num_changedlines; uint64_t num_newlines; double filesize_percent_change; }; bool isDirectory(std::string dirname) { DIR *dir = opendir(dirname.c_str()); if (dir) { closedir(dir); return true; } else { return false; } } bool containsAlogFile(std::string dirname) { DIR *dir = opendir(dirname.c_str()); std::string alog_suffix = ".alog"; if (dir) { struct dirent *entity; while ((entity = readdir(dir)) != nullptr) { std::string filename = entity->d_name; if (filename.size() > alog_suffix.size() && filename.compare(filename.size() - alog_suffix.size(), alog_suffix.size(), alog_suffix) == 0) { closedir(dir); return true; } } closedir(dir); } return false; } bool isValidDirectory(std::string dirname) { if (isDirectory(dirname)) { return containsAlogFile(dirname); } else { return false; } } std::string getAlogFileName(std::string dirname) { // To get this far, we already confirmed that an alog exists DIR *dir = opendir(dirname.c_str()); std::string alog_suffix = ".alog"; struct dirent *entity; while ((entity = readdir(dir)) != nullptr) { std::string filename = entity->d_name; if (filename.size() > alog_suffix.size() && filename.compare(filename.size() - alog_suffix.size(), alog_suffix.size(), alog_suffix) == 0) { closedir(dir); filename = dirname + filename; return filename; } } } // TODO: Monitor AlogFileInfo, in terms of the original information in the file, as well as the difference w/rt the new output file if we are rewriting class EvalEnginePostProcessor { public: EvalEnginePostProcessor(EvalEngine *engine, std::string appName) { // Store the engine m_engine = engine; m_appName = appName; m_engineParameters = m_engine->getParameterKeys(); m_engineSubscriptions = m_engine->getSubscriptions(); m_enginePublications = m_engine->getPublications(); //When outputting a final Evaluation/latest kernel publications, we exclude visual artifacts, as a convenience m_output_x_list.insert("TRAIL_RESET"); m_output_x_list.insert("VIEW_CIRCLE"); m_output_x_list.insert("VIEW_COMMS_PULSE"); m_output_x_list.insert("VIEW_GRID"); m_output_x_list.insert("VIEW_GRID_CONFIG"); m_output_x_list.insert("VIEW_GRID_DELTA"); m_output_x_list.insert("VIEW_POINT"); m_output_x_list.insert("VIEW_POLYGON"); m_output_x_list.insert("VIEW_SEGLIST"); m_output_x_list.insert("VIEW_MARKER"); m_output_x_list.insert("VIEW_RANGE_PULSE"); m_output_x_list.insert("VIEW_VECTOR"); } bool setParam(std::string param_name, std::string param_val) { // Update the internal parameter dictionary bool handled = false; if (param_name == m_verbose_key) { m_verbose = true; handled = true; } else if (param_name == m_rewrite_key) { m_rewrite = true; handled = true; } else if (param_name == m_tag_key) { m_tag = param_val; m_param_lookup[m_tag_key] = m_tag; handled = true; } else if (param_name == m_quiet_key) { m_quiet = true; handled = true; } else if (param_name == m_process_name_key) { m_pname = param_val; m_param_lookup[m_process_name_key] = m_pname; handled = true; } else if (param_name == m_target_directory_key) { m_target_directory = param_val; if (m_target_directory.back() != '/') { m_target_directory += "/"; } m_param_lookup[m_target_directory_key] = m_target_directory; handled = setParam(m_input_alog_file_key, getAlogFileName(m_target_directory)); } else if (param_name == m_input_alog_file_key) { m_input_alogf = param_val; m_param_lookup[m_input_alog_file_key] = m_input_alogf; if (fileExists(m_input_alogf)) { handled = true; } else { std::cerr << ".alog file does not exist < " << m_input_alogf << " >" << std::endl; } } else if (param_name == m_moos_file_key) { m_input_moosf = param_val; m_param_lookup[m_moos_file_key] = m_input_moosf; if (m_MissionReader.SetFile(m_input_moosf)) { handled = true; } else { std::cerr << "Mission file not found: " << m_input_moosf << std::endl; } } return handled; } void process() { m_param_lookup[m_verbose_key] = m_tag; m_param_lookup[m_process_name_key] = m_pname; m_param_lookup[m_input_alog_file_key] = m_input_alogf; m_param_lookup[m_moos_file_key] = m_input_moosf; // Use the mission reader and extract the configuration block for the associated EvalEngine getConfigParams(); // Initialize EvalEngine m_engine->setIdentity(m_pname); m_engine->setTime(0); bool handled = false; double apptick_time_s = 1 / m_apptick; double commtick_time_s = 1 / m_commtick; // The half the lesser of the two tick rates is the frequency in which we'll run our state machine double write_period_s = apptick_time_s < commtick_time_s ? apptick_time_s / 2.0 : commtick_time_s / 2.0; m_engine->Initialize(); // If rewriting, make the directory and the file pointer, also dump the new MOOS file with updated parameters? if (m_rewrite) { // Get the absolute time in which the log file was opened m_logstart = getLogStartStr(); // TODO: Add execution time for this post processor within the directory name so there are two discriminators // MOOSLog_DD_MM_YYYY_____HH_MM_SS/MOOSLog_DD_MM_YYYY_____HH_MM_SS_{m_tag}/ // We do this since you are not making a new mission, just rewriting what would have been realtime calculations, so it is still part of the original mission directory m_output_directory = m_target_directory + m_target_directory.substr(0, m_target_directory.size() - 1) + "_" + m_tag + "/"; makeDirectory(m_output_directory); makeAlogFile(); cacheMOOSConfig(m_input_moosf); makeMOOSFile(); } // Initialize states and latest updates for state machine std::map<std::string, EEMessage> mail; std::vector<std::pair<std::string, std::string> > updates; std::map<std::string, std::string> latest_updates_exclusive; std::map<std::string, std::string> prev_updates; bool done = false; std::ifstream file(m_input_alogf); std::string line; double next_mailupdate = 0; double next_evaluation = 0; double c_time = 0; bool first_read = true; // If the file is open, we proceed to go through the file if (file.is_open()) { while (std::getline(file, line)) { // While we are not at the end of the file // We remove lines which are comments or empty if (line.size() == 0 || line.at(0) == '%') { continue; } // We get the TimeStamp to update our state machine // TODO: Do error checking on TimeStamp // TODO: EEMessage = getMessage(line) (Reduce this to a single function which converts a line into a message) std::string timestamp = getTimeStamp(line); // We check the message to see if it came from our EvalEngine process name argument, if it does, we ignore it std::string srcname = getSourceNameNoAux(line); if (srcname == m_pname) { // This line was published by the previous EvalEngine process, so we ignore it continue; } if (m_rewrite) { std::ofstream file(m_output_alogf, std::ios::app); if (file.is_open()) { file << line << std::endl; } else { std::cerr << "Cannot open output alog file: " << m_output_alogf << std::endl; exit(1); } file.close(); } // We get the variable name, and the value std::string varname = getVarName(line); std::string value = getDataEntry(line); EEMessage msg; double time_stamp = stod(timestamp); if (time_stamp < 0) continue; // TODO: Need to compose more/better constructors for EEMessage // We accrue mail until it is the apps next time to process mail msg.name = varname; msg.timestamp = time_stamp; msg.src = srcname; msg.contents_str = value; mail[varname] = msg; // If it is time to handle new mail which has built up in the queue if we have new mail, then we update all the kernel variables and then schedule the next time in which we update variables // The state machine time also has to at least be in sync with the alog files previous logs in order to see if there have been viable updates // With this latest update in the buffer, let the state machine timer run until it is time to handle the new mail while processing back end updates // When running the approximate mailupdate, and if we have new mail, we let the kernels update all their variables if (c_time > time_stamp) c_time = time_stamp; do { if (time_stamp >= next_mailupdate && (mail.size() != 0)) { m_engine->setTime(c_time); m_engine->UpdateVariables(mail); mail.clear(); next_mailupdate = c_time + commtick_time_s; } // If it is time to write/publish another evaluation, then we write the latest updates to the file while scheduling the next evaluation time if (c_time >= next_evaluation) { // Get all the new updates, and write them as an alogfile updates = m_engine->ComputeEvaluations(); std::vector<std::pair<std::string, std::string> >::iterator it; for (it = updates.begin(); it != updates.end(); ++it) { latest_updates_exclusive[it->first] = it->second; } if (m_rewrite) { std::string time = std::to_string(c_time); std::stringstream newentry; std::ofstream file(m_output_alogf, std::ios::app); if (file.is_open()) { for (it = updates.begin(); it != updates.end(); ++it) { // If this variable has previously been published to if (prev_updates.count(it->first)) { // If the content of the message was the same, we continue if (prev_updates[it->first] == it->second) continue; } newentry << std::setw(15) << std::fixed << std::setprecision(5) << std::left << c_time << " " << std::setw(20) << std::left << it->first << " " << std::setw(15) << std::left << m_pname << " " << it->second << std::endl; file << newentry.str(); prev_updates[it->first] = it->second; } } else { std::cerr << "Cannot open output alog file: " << m_output_alogf << std::endl; exit(1); } file.close(); next_evaluation = c_time + apptick_time_s; } } c_time += write_period_s; } while (c_time <= time_stamp); } } else { std::cerr << "Unable to open input file: " << m_input_alogf << std::endl; exit(1); } // TODO: Need to make an exclusion list for all visual artifacts which a kernel can publish, i.e. VIEW_SEGLIST // At this point, we now maintain the latest updates from all the kernels, and we write this to the stdout if (latest_updates_exclusive.size() != 0) { std::map<std::string, std::string>::iterator it; for (it = latest_updates_exclusive.begin(); it != latest_updates_exclusive.end(); ++it) { if (m_output_x_list.count(it->first)) continue; std::cout << it->first << "=" << it->second; if (std::next(it) != latest_updates_exclusive.end()) { std::cout << ", "; } } std::cout << std::endl; } else { std::cout << "Error: Kernel rendered no updates throughout Alog file" << std::endl; exit(1); } } private: // State variables EvalEngine *m_engine; std::string m_appName; bool m_debug = false; double m_apptick = 4.0; double m_commtick = 4.0; // TODO: Engine Publications will be changing to a topic which maintains a list of all the kernels subscribed std::set<std::string> m_engineSubscriptions; // TODO: Engine Subscriptions will be changing to a topic which maintains a list of all the kernels publishing to them std::set<std::string> m_enginePublications; std::set<std::string> m_engineParameters; std::map<std::string, std::string> m_param_lookup; CProcessConfigReader m_MissionReader; // Verbose means we will print the active status while we are reading the alog file (Optional - Default false) std::string m_verbose_key = "verbose"; bool m_verbose = false; // Rewrite means we are going to rewrite a processed alog file which allows visualization tools // and cummulative observations to be plotted with the new parameters (Optional - Default false) std::string m_rewrite_key = "rewrite"; bool m_rewrite = false; // Tag is how we will tag the rewritten file, if one is enabled (Optional - Default "_eerev") std::string m_tag_key = "tag"; std::string m_tag = "eerev"; // Quiet means we will only report the final result and nothing else (Optional - Default false) std::string m_quiet_key = "quiet"; bool m_quiet = false; // The process name is what we are looking for in the configuration block (Mandatory - no default) std::string m_process_name_key = "process_name"; std::string m_pname = "NA"; // The alog file is what we will be reading (Mandatory - no default) std::string m_input_alog_file_key = "alog_file"; std::string m_input_alogf = "NA"; // The alog file is what we will be writing to (Optional and only if we are rewriting - defaults to the name of the original file + the tag) std::string m_output_alogf = ""; // The MOOS file is where we will look for our configuration block (Mandatory - no default) std::string m_moos_file_key = "moos_file"; std::string m_target_directory_key = "target_directory"; std::string m_target_directory = "Unassigned-target-directory"; std::string m_output_directory = "Unassigned-output-directory"; std::string m_input_moosf = "Unassigned-input-moos-file"; std::string m_output_moosf = "Unassigned-output-moos-file"; std::string m_logstart = ""; std::set<std::string> m_output_x_list; std::string m_cached_moosf; protected: // Internal Methods bool check_line(std::string line) { // Might use? return true; } bool fileExists(std::string fname) { std::ifstream file(fname.c_str()); return file.good(); } bool makeDirectory(std::string dirname) { std::cout << "Making directory " << dirname << std::endl; if (mkdir(dirname.c_str(), 0700) != 0) { std::cerr << "Failed to make new directory\n"; return false; } else { return true; } } int asAscii(char c) { return static_cast<int>(c); }; bool charIsNumber(char c) { return ((asAscii(c) > asAscii('0')) && (asAscii(c) < asAscii('9'))); } std::string getLogStartStr() { std::string line; std::ifstream file(m_input_alogf); std::string logstart_time = ""; // std::function<int(char)> asAscii = [ ](char c) { }; // std::function<bool(char)> charIsNumber = [ ](char c){ }; if (file.is_open()) { // Extract the header - Terminate on an absense of '%' for (std::string line; (std::getline(file, line) && line[0] == '%');) { // Scan until we find the LOGSTART argument if (line.find("LOGSTART") != std::string::npos) { std::string tok = "LOGSTART"; // std::string garb = biteStringX(line, tok); // gets everything after token int pos = line.find(tok); line = line.substr(pos + tok.size()); // Once we find the logstart argument, we scan for all the legal numbers for (char c : line) { if (!charIsNumber(c) && c != '.') continue; logstart_time += c; } break; } else { continue; } } if (logstart_time == "") { std::cerr << "Unable to obtain log start time from alog file\n"; exit(1); } return logstart_time; } else { std::cerr << "Unable to open input alog log file\n"; exit(1); return ""; } } std::string getLogHeader(std::string fname) { std::string header; // Capture and generate the header of the alog log file std::string line; std::ifstream file(fname); int pos = 0; if (file.is_open()) { // Extract the header - Terminate on an absense of '%' for (std::string line; (std::getline(file, line) && line[0] == '%');) { // Otherwise we need express that this is a manipulated and revised file // Format the new time in which it was opened header += line + "\n"; } } else { std::cerr << "Unable to open file: " << fname << std::endl; exit(1); } return header; } bool makeAlogFile() { // This will make a new alog log file std::string alogheader = getLogHeader(m_input_alogf); std::stringstream alogheaderss(alogheader); std::string new_alogheader; // Capture and generate the header of the alog log file int pos = 0; pos = m_input_alogf.find(".alog"); m_output_alogf = m_output_directory + m_pname + "_" + m_tag + ".alog"; std::ofstream ofile(m_output_alogf); if (alogheader.size()) { // Extract the header - Terminate on an absense of '%' for (std::string line; std::getline(alogheaderss, line) && line[0] == '%';) { if (line.find("LOG FILE: ") != std::string::npos) { pos = line.find(m_input_alogf); new_alogheader += line.substr(0, pos) + m_output_alogf; } else if (line.find("FILE OPENED ON") != std::string::npos) { pos = line.find(" "); std::string lead = line.substr(0, pos); // Update the file opened line new_alogheader += lead + "<Include new time here> " + "POST PROCESSED REWRITTEN FILE - Originally from " + line + "\n"; } else { // Otherwise we need express that this is a manipulated and revised file // Format the new time in which it was opened new_alogheader += line + "\n"; } } } else { std::cerr << "No contents in header: " << alogheader << std::endl; exit(1); } ofile << alogheader << std::endl; ofile.close(); return true; } bool makeMOOSFile() { std::string new_moosheader; // Capture and generate the header of the alog log file int pos = 0; pos = m_input_moosf.find(".moos"); m_output_moosf = m_output_directory + m_input_moosf.substr(0, pos) + "_" + m_tag + "._moos"; std::ofstream ofile(m_output_moosf); if (m_cached_moosf.size()) { ofile << "//" << xChars("-", 59) << std::endl; ofile << "// Automatically generated by: <" << m_appName << "> when referencing the configuration for <" << m_pname << ">" << std::endl; ofile << "//" << xChars("-", 59) << std::endl; ofile << m_cached_moosf << std::endl; } else { exit(1); } ofile.close(); return true; } bool cacheMOOSConfig(std::string moosfile) { std::string line; std::ifstream file(moosfile); std::stringstream moosfile_contents; bool found_start_to_config = false; int stack_height = 0; int stack_max = 0; if (file.is_open()) { //Extract only the related configuration block in the .moos file to save it when rewriting while(std::getline(file, line)) { if((line.find("ProcessConfig") != std::string::npos) && (line.find(m_pname) != std::string::npos)) { std::cout << "Found start " << line << std::endl; found_start_to_config = true; } else if (!found_start_to_config) { continue; } //The stack must exceed one at least once and the next time it reaches zero, we've reached a full configuration block closed in brackets at the same level for(int i = 0; i < line.size(); ++i) { if(line[i] == '{') stack_height++; if(line[i] == '}') stack_height--; if(stack_height > stack_max) stack_max = stack_height; } if(found_start_to_config) { moosfile_contents << line << std::endl; } if(found_start_to_config && stack_height == 0 && stack_max > 0) break; } m_cached_moosf = moosfile_contents.str(); } else { std::cerr << "Unable to open input moos config file\n"; exit(1); } } std::string xChars(std::string c, int nchars) { std::string str; for (int i = 0; i < nchars; ++i) { str += c; } return str; } bool getConfigParams() { // Obtain parameter list from the process block name STRING_LIST sParams; m_MissionReader.EnableVerbatimQuoting(false); // Unlike a MOOS App, we need to manually obtain the apptick and commstick m_MissionReader.SetAppName(m_pname); m_MissionReader.GetConfigurationParam("APPTICK", m_apptick); m_MissionReader.GetConfigurationParam("COMMSTICK", m_commtick); if (!m_MissionReader.GetConfiguration(m_pname, sParams)) std::cerr << "Unhandled configuration parameter: " << m_pname << std::endl; // Similar to a MOOS App, we parse a mission with the mission reader STRING_LIST::iterator p; for (p = sParams.begin(); p != sParams.end(); ++p) { std::string orig = *p; std::string line = *p; std::string param = tolower(biteStringX(line, '=')); std::string value = line; bool handled = false; if (m_engineParameters.count(param)) { handled = m_engine->setParameter(param, value); } else if (param == "debug") { // TODO: Actually update the debug state of the eval engine m_debug = (value == tolower("true")) ? true : false; if (m_debug) { m_engine->set_debug_stream("garb.txt"); } handled = true; } else if (param == "commstick") { m_commtick = stod(value); handled = true; } else if (param == "apptick") { m_apptick = stod(value); handled = true; } if (!handled) std::cerr << "Unhandled configuration parameter: " << orig << std::endl; } } public: bool v; };