diff --git a/nifi/client/client.go b/nifi/client/client.go index e753376..787b053 100644 --- a/nifi/client/client.go +++ b/nifi/client/client.go @@ -121,61 +121,96 @@ func (c *Client) GetProcessGroups(parentID string) ([]ProcessGroupEntity, error) // GetConnections traverses the process group hierarchy returning information about // all connections func (c *Client) GetConnections(parentID string) ([]ConnectionEntity, error) { - var entity ConnectionsEntity - if err := c.getDeepConnections(parentID, &entity); err != nil { - return nil, err + results := make(chan ConnectionEntity, 1) + var entities []ConnectionEntity + // Results accumelator + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for entity := range results { + entities = append(entities, entity) + } + }() + + // Start the tree walk + if err := c.getDeepConnections(results, parentID); err != nil { + log.Errorf("failed to get id %s", parentID, err) } - return entity.Connections, nil + close(results) + wg.Wait() + return entities, nil } -func (c *Client) getDeepConnections(parentID string, connectionsEntity *ConnectionsEntity) error { +func (c *Client) getDeepConnections(results chan ConnectionEntity, parentID string) error { var entity ConnectionsEntity // Get the connections for the current process group if err := c.request("/process-groups/"+parentID+"/connections", nil, &entity); err != nil { return errors.Trace(err) } - + for _, entity := range entity.Connections { + results <- entity + } // And the child process groups - var pgentity ProcessGroupsEntity if err := c.request("/process-groups/"+parentID+"/process-groups", nil, &pgentity); err != nil { return errors.Trace(err) } + var wg sync.WaitGroup for _, pg := range pgentity.ProcessGroups { - if err := c.getDeepConnections(pg.ID, connectionsEntity); err != nil { - return err + wg.Add(1) + go func(wg *sync.WaitGroup, id string) { + defer wg.Done() + // FIXME: We aren't collecting errors, that's kinda uncool + c.getDeepConnections(results, id) + }(&wg, pg.ID) } - } - connectionsEntity.Connections = append(connectionsEntity.Connections, entity.Connections...) + wg.Wait() return nil } // GetDeepProcessGroups traverses the process group hierarchy returning information about // this and all child process groups func (c *Client) GetDeepProcessGroups(parentID string) ([]ProcessGroupEntity, error) { - var entity ProcessGroupsEntity - if err := c.getDeepProcessGroups(parentID, &entity); err != nil { - return nil, err + results := make(chan ProcessGroupEntity, 1) + var entities []ProcessGroupEntity + // Results accumelator + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for entity := range results { + entities = append(entities, entity) } - return entity.ProcessGroups, nil - + }() + // Start the tree walk + if err := c.getDeepProcessGroups(results, parentID); err != nil { + log.Errorf("failed to get id %s", parentID, err) + } + close(results) + wg.Wait() + return entities, nil } -func (c *Client) getDeepProcessGroups(parentID string, groupsEntity *ProcessGroupsEntity) error { +func (c *Client) getDeepProcessGroups(results chan ProcessGroupEntity, parentID string) error { var entity ProcessGroupsEntity if err := c.request("/process-groups/"+parentID+"/process-groups", nil, &entity); err != nil { return errors.Trace(err) } - + var wg sync.WaitGroup for _, pg := range entity.ProcessGroups { - if err := c.getDeepProcessGroups(pg.ID, groupsEntity); err != nil { - return err + results <- pg + wg.Add(1) + go func(wg *sync.WaitGroup, id string) { + defer wg.Done() + // FIXME: We aren't collecting errors, that's kinda uncool + c.getDeepProcessGroups(results, id) + }(&wg, pg.ID) } - } - groupsEntity.ProcessGroups = append(groupsEntity.ProcessGroups, entity.ProcessGroups...) + wg.Wait() return nil }