Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(libooniengine): add support for running tasks #1112

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
61 changes: 61 additions & 0 deletions internal/engine/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,63 @@ func NewSession(ctx context.Context, config SessionConfig) (*Session, error) {
return sess, nil
}

func NewSessionWithoutTunnel(ctx context.Context, config *SessionConfig) (*Session, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced we need this new constructor

if config.Logger == nil {
return nil, errors.New("Logger is empty")
}
if config.SoftwareName == "" {
return nil, errors.New("SoftwareName is empty")
}
if config.SoftwareVersion == "" {
return nil, errors.New("SoftwareVersion is empty")
}
if config.KVStore == nil {
config.KVStore = &kvstore.Memory{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not mutate a config passed by pointer.

}
// Implementation note: if config.TempDir is empty, then Go will
// use the temporary directory on the current system. This should
// work on Desktop. We tested that it did also work on iOS, but
// we have also seen on 2020-06-10 that it does not work on Android.
tempDir, err := ioutil.TempDir(config.TempDir, "ooniengine")
if err != nil {
return nil, err
}
config.Logger.Infof(
"ooniprobe-engine/v%s %s dirty=%s %s",
version.Version,
runtimex.BuildInfo.VcsRevision,
runtimex.BuildInfo.VcsModified,
runtimex.BuildInfo.GoVersion,
)
sess := &Session{
availableProbeServices: config.AvailableProbeServices,
byteCounter: bytecounter.New(),
kvStore: config.KVStore,
logger: config.Logger,
queryProbeServicesCount: &atomic.Int64{},
softwareName: config.SoftwareName,
softwareVersion: config.SoftwareVersion,
tempDir: tempDir,
torArgs: config.TorArgs,
torBinary: config.TorBinary,
tunnelDir: config.TunnelDir,
}
var proxyURL *url.URL = nil
sess.proxyURL = proxyURL
sess.resolver = &sessionresolver.Resolver{
ByteCounter: sess.byteCounter,
KVStore: config.KVStore,
Logger: sess.logger,
ProxyURL: proxyURL,
}
txp := netxlite.NewHTTPTransportWithLoggerResolverAndOptionalProxyURL(
sess.logger, sess.resolver, sess.proxyURL,
)
txp = bytecounter.WrapHTTPTransport(txp, sess.byteCounter)
sess.httpDefaultTransport = txp
return sess, nil
}

// TunnelDir returns the persistent directory used by tunnels.
func (s *Session) TunnelDir() string {
return s.tunnelDir
Expand Down Expand Up @@ -393,6 +450,10 @@ func (s *Session) MaybeLookupBackends() error {
return s.MaybeLookupBackendsContext(context.Background())
}

func (s *Session) Resolver() *sessionresolver.Resolver {
return s.resolver
}

// ErrAlreadyUsingProxy indicates that we cannot create a tunnel with
// a specific name because we already configured a proxy.
var ErrAlreadyUsingProxy = errors.New(
Expand Down
21 changes: 21 additions & 0 deletions internal/libooniengine/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

//
// Emitter
//

// taskEmitter implements taskMaybeEmitter.
type taskChanEmitter struct {
// out is the channel where we emit events.
out chan *response
}

var _ taskMaybeEmitter = &taskChanEmitter{}

// maybeEmitEvent implements taskMaybeEmitter.maybeEmitEvent.
func (e *taskChanEmitter) maybeEmitEvent(resp *response) {
select {
case e.out <- resp:
default: // buffer full, discard this event
}
}
30 changes: 0 additions & 30 deletions internal/libooniengine/engine.go

This file was deleted.

55 changes: 54 additions & 1 deletion internal/libooniengine/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
/// C API for using the OONI engine.
///

#include <stdint.h>

/// OONITask is an asynchronous thread of execution managed by the OONI
/// engine that performs a background operation and emits meaningful
/// events such as, for example, the results of measurements.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description should be updated to mention that we emit interim outputs such as logs and progress as well as the result of the operation.

typedef uintptr_t OONITask;

#ifdef __cplusplus
extern "C" {
#endif
Expand All @@ -19,7 +26,53 @@ char *OONIEngineVersion(void);
/// OONIEngineFreeMemory frees the memory allocated by the engine.
///
/// @param ptr a void pointer refering to the memory to be freed.
void OONIENgineFreeMemory(void *ptr);
void OONIEngineFreeMemory(void *ptr);

/// OONIEngineCall starts a new OONITask using the given [req].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote the prototype, I used [req] but it seems the right syntax when using Doxygen is @p req. Do you mind updating the documentation in this file?

///
/// @param req A JSON string, owned by the caller, that
/// contains the configuration for the task to start.
///
/// @return Zero on failure, nonzero on success. If the return value
/// is nonzero, a task is running. In such a case, the caller is
/// responsible to eventually dispose of the task using OONIEngineFreeMemory.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we can use OONIEngineFreeMemory to dispose of a task? I thought we should be having a dedicated API that removes the corresponding handle from the corresponding map.

OONITask OONIEngineCall(char *req);

/// OONIEngineWaitForNextEvent awaits on the [task] event queue until
/// a new event is available or the given [timeout] expires.
///
/// @param task Task handle returned by OONIEngineCall.
///
/// @param timeout Timeout in milliseconds. If the timeout is zero
/// or negative, this function would potentially block forever.
///
/// @return A NULL pointer on failure, non-NULL otherwise. If the return
/// value is non-NULL, the caller takes ownership of the OONIMessage
/// pointer and MUST free it using OONIMessageFree when done using it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this say OONIEngineFreeMemory?

///
/// This function will return NULL:
///
/// 1. when the timeout expires;
///
/// 2. if [task] is done;
///
/// 3. if [task] is zero or does not refer to a valid task;
///
/// 4. if we cannot protobuf serialize the message;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using JSON now, so this text should be adapted.

///
/// 5. possibly because of other unknown internal errors.
///
/// In short, you cannot reliably determine whether a task is done by
/// checking whether this function has returned NULL.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did the original API include a function for checking whether a task is done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you for pointing this out (I missed it). I have implemented this in future commits (along with the go code)

char *OONIEngineWaitForNextEvent(OONITask task, int32_t timeout);

/// OONIEngineInterrupt tells [task] to stop ASAP.
///
/// @param task Task handle returned by OONIEngineCall. If task is zero
/// or does not refer to a valid task, this function will just do nothing.
void OONIEngineInterrupt(OONITask task);

// OONITask
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// OONITask


#ifdef __cplusplus
}
Expand Down
67 changes: 67 additions & 0 deletions internal/libooniengine/geolocate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"context"

"github.com/ooni/probe-cli/v3/internal/geolocate"
)

func init() {
taskRegistry["Geolocate"] = &geolocateTaskRunner{}
}

// geolocateOptions contains the request arguments for the Geolocate task.
type geolocateOptions struct {
SessionId int64 `json:",omitempty"`
}

// geolocateResponse is the response for the Geolocate task.
type geolocateResponse struct {
ASN uint `json:",omitempty"`
CountryCode string `json:",omitempty"`
NetworkName string `json:",omitempty"`
ProbeIP string `json:",omitempty"`
ResolverASN uint `json:",omitempty"`
ResolverIP string `json:",omitempty"`
ResolverNetworkName string `json:",omitempty"`
Error string `json:",omitempty"`
}

type geolocateTaskRunner struct{}

var _ taskRunner = &geolocateTaskRunner{}

// main implements taskRunner.main
func (tr *geolocateTaskRunner) main(ctx context.Context,
emitter taskMaybeEmitter, req *request, resp *response) {
logger := newTaskLogger(emitter)
sessionId := req.Geolocate.SessionId
sess := mapSession[sessionId]
if sess == nil {
logger.Warnf("session: %s", errInvalidSessionId.Error())
resp.Geolocate.Error = errInvalidSessionId.Error()
return
}
gt := geolocate.NewTask(geolocate.Config{
Logger: sess.Logger(),
Resolver: sess.Resolver(),
UserAgent: sess.UserAgent(),
})
results, err := gt.Run(ctx)
if err != nil {
logger.Warnf("geolocate: %s", err.Error())
resp.Geolocate.Error = err.Error()
return
}
resp = &response{
Geolocate: geolocateResponse{
ASN: results.ASN,
CountryCode: results.CountryCode,
NetworkName: results.NetworkName,
ProbeIP: results.ProbeIP,
ResolverASN: results.ResolverASN,
ResolverIP: results.ResolverIP,
ResolverNetworkName: results.ResolverNetworkName,
},
}
}
88 changes: 88 additions & 0 deletions internal/libooniengine/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"fmt"

"github.com/ooni/probe-cli/v3/internal/model"
)

type LogLevel int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot these be strings? I think it's easier for a consumer if these are strings.


const (
// The DEBUG log level.
logLevel_DEBUG LogLevel = 0
// The INFO log level.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The INFO log level.
// The INFO log level.

logLevel_INFO LogLevel = 1
// The WARNING log level.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The WARNING log level.
// The WARNING log level.

logLevel_WARNING LogLevel = 2
)

type logResponse struct {
Level LogLevel `json:",omitempty"`
Message string `json:",omitempty"`
}

// taskLogger implements model.Logger for tasks.
type taskLogger struct {
// emitter is used to emit log events.
emitter taskMaybeEmitter

// verbose indicates whether verbose logging is enabled.
verbose bool
}

// newLogger creates a new taskLogger instance using
// the [emitter] to emit log events.
func newTaskLogger(emitter taskMaybeEmitter) *taskLogger {
return &taskLogger{
emitter: emitter,
verbose: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor should also initialize verbose.

}
}

var _ model.Logger = &taskLogger{}

// Debugf implements model.Logger.Debugf.
func (tl *taskLogger) Debugf(format string, values ...any) {
if tl.verbose {
tl.emit(logLevel_DEBUG, fmt.Sprintf(format, values...))
}
}

// Debug implements model.Logger.Debug.
func (tl *taskLogger) Debug(message string) {
if tl.verbose {
tl.emit(logLevel_DEBUG, message)
}
}

// Infof implements model.Logger.Infof.
func (tl *taskLogger) Infof(format string, values ...any) {
tl.emit(logLevel_INFO, fmt.Sprintf(format, values...))
}

// Info implements model.Logger.Info.
func (tl *taskLogger) Info(message string) {
tl.emit(logLevel_INFO, message)
}

// Warnf implements model.Logger.Warnf.
func (tl *taskLogger) Warnf(format string, values ...any) {
tl.emit(logLevel_WARNING, fmt.Sprintf(format, values...))
}

// Warn implements model.Logger.Warn.
func (tl *taskLogger) Warn(message string) {
tl.emit(logLevel_WARNING, message)
}

// emit emits a log message.
func (tl *taskLogger) emit(level LogLevel, message string) {
logResp := &response{
Logger: logResponse{
Level: level,
Message: message,
},
}
tl.emitter.maybeEmitEvent(logResp)
}
Loading